14
0
forked from pool/python-Scrapy
Files
python-Scrapy/add-peak-method-to-queues.patch
Matej Cepl ed9c5a3da0 Accepting request 917688 from home:fusionfuture:branches:devel:languages:python
- Remove h2 < 4.0 dependency version restriction. (boo#1190035)
  * remove-h2-version-restriction.patch
- Add peak method to queues to fix build with queuelib 1.6.2.
  * add-peak-method-to-queues.patch
- Drop support for Python 3.6 as python-uvloop does not support it.
- Require testfixtures >= 6.0.0 (tests need LogCapture.check_present).
  (2953bb4caa)

OBS-URL: https://build.opensuse.org/request/show/917688
OBS-URL: https://build.opensuse.org/package/show/devel:languages:python/python-Scrapy?expand=0&rev=20
2021-09-09 12:02:15 +00:00

582 lines
21 KiB
Diff

--- a/scrapy/pqueues.py
+++ b/scrapy/pqueues.py
@@ -3,6 +3,7 @@ import logging
from scrapy.utils.misc import create_instance
+
logger = logging.getLogger(__name__)
@@ -17,8 +18,7 @@ def _path_safe(text):
>>> _path_safe('some@symbol?').startswith('some_symbol_')
True
"""
- pathable_slot = "".join([c if c.isalnum() or c in '-._' else '_'
- for c in text])
+ pathable_slot = "".join([c if c.isalnum() or c in '-._' else '_' for c in text])
# as we replace some letters we can get collision for different slots
# add we add unique part
unique_slot = hashlib.md5(text.encode('utf8')).hexdigest()
@@ -35,6 +35,9 @@ class ScrapyPriorityQueue:
* close()
* __len__()
+ Optionally, the queue could provide a ``peek`` method, that should return the
+ next object to be returned by ``pop``, but without removing it from the queue.
+
``__init__`` method of ScrapyPriorityQueue receives a downstream_queue_cls
argument, which is a class used to instantiate a new (internal) queue when
a new priority is allocated.
@@ -70,10 +73,12 @@ class ScrapyPriorityQueue:
self.curprio = min(startprios)
def qfactory(self, key):
- return create_instance(self.downstream_queue_cls,
- None,
- self.crawler,
- self.key + '/' + str(key))
+ return create_instance(
+ self.downstream_queue_cls,
+ None,
+ self.crawler,
+ self.key + '/' + str(key),
+ )
def priority(self, request):
return -request.priority
@@ -99,6 +104,18 @@ class ScrapyPriorityQueue:
self.curprio = min(prios) if prios else None
return m
+ def peek(self):
+ """Returns the next object to be returned by :meth:`pop`,
+ but without removing it from the queue.
+
+ Raises :exc:`NotImplementedError` if the underlying queue class does
+ not implement a ``peek`` method, which is optional for queues.
+ """
+ if self.curprio is None:
+ return None
+ queue = self.queues[self.curprio]
+ return queue.peek()
+
def close(self):
active = []
for p, q in self.queues.items():
@@ -116,8 +133,7 @@ class DownloaderInterface:
self.downloader = crawler.engine.downloader
def stats(self, possible_slots):
- return [(self._active_downloads(slot), slot)
- for slot in possible_slots]
+ return [(self._active_downloads(slot), slot) for slot in possible_slots]
def get_slot_key(self, request):
return self.downloader._get_slot_key(request, None)
@@ -162,10 +178,12 @@ class DownloaderAwarePriorityQueue:
self.pqueues[slot] = self.pqfactory(slot, startprios)
def pqfactory(self, slot, startprios=()):
- return ScrapyPriorityQueue(self.crawler,
- self.downstream_queue_cls,
- self.key + '/' + _path_safe(slot),
- startprios)
+ return ScrapyPriorityQueue(
+ self.crawler,
+ self.downstream_queue_cls,
+ self.key + '/' + _path_safe(slot),
+ startprios,
+ )
def pop(self):
stats = self._downloader_interface.stats(self.pqueues)
@@ -187,9 +205,22 @@ class DownloaderAwarePriorityQueue:
queue = self.pqueues[slot]
queue.push(request)
+ def peek(self):
+ """Returns the next object to be returned by :meth:`pop`,
+ but without removing it from the queue.
+
+ Raises :exc:`NotImplementedError` if the underlying queue class does
+ not implement a ``peek`` method, which is optional for queues.
+ """
+ stats = self._downloader_interface.stats(self.pqueues)
+ if not stats:
+ return None
+ slot = min(stats)[1]
+ queue = self.pqueues[slot]
+ return queue.peek()
+
def close(self):
- active = {slot: queue.close()
- for slot, queue in self.pqueues.items()}
+ active = {slot: queue.close() for slot, queue in self.pqueues.items()}
self.pqueues.clear()
return active
--- a/scrapy/squeues.py
+++ b/scrapy/squeues.py
@@ -19,7 +19,6 @@ def _with_mkdir(queue_class):
dirname = os.path.dirname(path)
if not os.path.exists(dirname):
os.makedirs(dirname, exist_ok=True)
-
super().__init__(path, *args, **kwargs)
return DirectoriesCreated
@@ -38,6 +37,20 @@ def _serializable_queue(queue_class, ser
if s:
return deserialize(s)
+ def peek(self):
+ """Returns the next object to be returned by :meth:`pop`,
+ but without removing it from the queue.
+
+ Raises :exc:`NotImplementedError` if the underlying queue class does
+ not implement a ``peek`` method, which is optional for queues.
+ """
+ try:
+ s = super().peek()
+ except AttributeError as ex:
+ raise NotImplementedError("The underlying queue class does not implement 'peek'") from ex
+ if s:
+ return deserialize(s)
+
return SerializableQueue
@@ -59,12 +72,21 @@ def _scrapy_serialization_queue(queue_cl
def pop(self):
request = super().pop()
-
if not request:
return None
+ return request_from_dict(request, self.spider)
- request = request_from_dict(request, self.spider)
- return request
+ def peek(self):
+ """Returns the next object to be returned by :meth:`pop`,
+ but without removing it from the queue.
+
+ Raises :exc:`NotImplementedError` if the underlying queue class does
+ not implement a ``peek`` method, which is optional for queues.
+ """
+ request = super().peek()
+ if not request:
+ return None
+ return request_from_dict(request, self.spider)
return ScrapyRequestQueue
@@ -76,6 +98,19 @@ def _scrapy_non_serialization_queue(queu
def from_crawler(cls, crawler, *args, **kwargs):
return cls()
+ def peek(self):
+ """Returns the next object to be returned by :meth:`pop`,
+ but without removing it from the queue.
+
+ Raises :exc:`NotImplementedError` if the underlying queue class does
+ not implement a ``peek`` method, which is optional for queues.
+ """
+ try:
+ s = super().peek()
+ except AttributeError as ex:
+ raise NotImplementedError("The underlying queue class does not implement 'peek'") from ex
+ return s
+
return ScrapyRequestQueue
@@ -109,17 +144,9 @@ MarshalLifoDiskQueueNonRequest = _serial
marshal.loads
)
-PickleFifoDiskQueue = _scrapy_serialization_queue(
- PickleFifoDiskQueueNonRequest
-)
-PickleLifoDiskQueue = _scrapy_serialization_queue(
- PickleLifoDiskQueueNonRequest
-)
-MarshalFifoDiskQueue = _scrapy_serialization_queue(
- MarshalFifoDiskQueueNonRequest
-)
-MarshalLifoDiskQueue = _scrapy_serialization_queue(
- MarshalLifoDiskQueueNonRequest
-)
+PickleFifoDiskQueue = _scrapy_serialization_queue(PickleFifoDiskQueueNonRequest)
+PickleLifoDiskQueue = _scrapy_serialization_queue(PickleLifoDiskQueueNonRequest)
+MarshalFifoDiskQueue = _scrapy_serialization_queue(MarshalFifoDiskQueueNonRequest)
+MarshalLifoDiskQueue = _scrapy_serialization_queue(MarshalLifoDiskQueueNonRequest)
FifoMemoryQueue = _scrapy_non_serialization_queue(queue.FifoMemoryQueue)
LifoMemoryQueue = _scrapy_non_serialization_queue(queue.LifoMemoryQueue)
--- /dev/null
+++ b/tests/test_pqueues.py
@@ -0,0 +1,144 @@
+import tempfile
+import unittest
+
+import queuelib
+
+from scrapy.http.request import Request
+from scrapy.pqueues import ScrapyPriorityQueue, DownloaderAwarePriorityQueue
+from scrapy.spiders import Spider
+from scrapy.squeues import FifoMemoryQueue
+from scrapy.utils.test import get_crawler
+
+from tests.test_scheduler import MockDownloader, MockEngine
+
+
+class PriorityQueueTest(unittest.TestCase):
+ def setUp(self):
+ self.crawler = get_crawler(Spider)
+ self.spider = self.crawler._create_spider("foo")
+
+ def test_queue_push_pop_one(self):
+ temp_dir = tempfile.mkdtemp()
+ queue = ScrapyPriorityQueue.from_crawler(self.crawler, FifoMemoryQueue, temp_dir)
+ self.assertIsNone(queue.pop())
+ self.assertEqual(len(queue), 0)
+ req1 = Request("https://example.org/1", priority=1)
+ queue.push(req1)
+ self.assertEqual(len(queue), 1)
+ dequeued = queue.pop()
+ self.assertEqual(len(queue), 0)
+ self.assertEqual(dequeued.url, req1.url)
+ self.assertEqual(dequeued.priority, req1.priority)
+ self.assertEqual(queue.close(), [])
+
+ def test_no_peek_raises(self):
+ if hasattr(queuelib.queue.FifoMemoryQueue, "peek"):
+ raise unittest.SkipTest("queuelib.queue.FifoMemoryQueue.peek is defined")
+ temp_dir = tempfile.mkdtemp()
+ queue = ScrapyPriorityQueue.from_crawler(self.crawler, FifoMemoryQueue, temp_dir)
+ queue.push(Request("https://example.org"))
+ with self.assertRaises(NotImplementedError, msg="The underlying queue class does not implement 'peek'"):
+ queue.peek()
+ queue.close()
+
+ def test_peek(self):
+ if not hasattr(queuelib.queue.FifoMemoryQueue, "peek"):
+ raise unittest.SkipTest("queuelib.queue.FifoMemoryQueue.peek is undefined")
+ temp_dir = tempfile.mkdtemp()
+ queue = ScrapyPriorityQueue.from_crawler(self.crawler, FifoMemoryQueue, temp_dir)
+ self.assertEqual(len(queue), 0)
+ self.assertIsNone(queue.peek())
+ req1 = Request("https://example.org/1")
+ req2 = Request("https://example.org/2")
+ req3 = Request("https://example.org/3")
+ queue.push(req1)
+ queue.push(req2)
+ queue.push(req3)
+ self.assertEqual(len(queue), 3)
+ self.assertEqual(queue.peek().url, req1.url)
+ self.assertEqual(queue.pop().url, req1.url)
+ self.assertEqual(len(queue), 2)
+ self.assertEqual(queue.peek().url, req2.url)
+ self.assertEqual(queue.pop().url, req2.url)
+ self.assertEqual(len(queue), 1)
+ self.assertEqual(queue.peek().url, req3.url)
+ self.assertEqual(queue.pop().url, req3.url)
+ self.assertEqual(queue.close(), [])
+
+ def test_queue_push_pop_priorities(self):
+ temp_dir = tempfile.mkdtemp()
+ queue = ScrapyPriorityQueue.from_crawler(self.crawler, FifoMemoryQueue, temp_dir, [-1, -2, -3])
+ self.assertIsNone(queue.pop())
+ self.assertEqual(len(queue), 0)
+ req1 = Request("https://example.org/1", priority=1)
+ req2 = Request("https://example.org/2", priority=2)
+ req3 = Request("https://example.org/3", priority=3)
+ queue.push(req1)
+ queue.push(req2)
+ queue.push(req3)
+ self.assertEqual(len(queue), 3)
+ dequeued = queue.pop()
+ self.assertEqual(len(queue), 2)
+ self.assertEqual(dequeued.url, req3.url)
+ self.assertEqual(dequeued.priority, req3.priority)
+ self.assertEqual(queue.close(), [-1, -2])
+
+
+class DownloaderAwarePriorityQueueTest(unittest.TestCase):
+ def setUp(self):
+ crawler = get_crawler(Spider)
+ crawler.engine = MockEngine(downloader=MockDownloader())
+ self.queue = DownloaderAwarePriorityQueue.from_crawler(
+ crawler=crawler,
+ downstream_queue_cls=FifoMemoryQueue,
+ key="foo/bar",
+ )
+
+ def tearDown(self):
+ self.queue.close()
+
+ def test_push_pop(self):
+ self.assertEqual(len(self.queue), 0)
+ self.assertIsNone(self.queue.pop())
+ req1 = Request("http://www.example.com/1")
+ req2 = Request("http://www.example.com/2")
+ req3 = Request("http://www.example.com/3")
+ self.queue.push(req1)
+ self.queue.push(req2)
+ self.queue.push(req3)
+ self.assertEqual(len(self.queue), 3)
+ self.assertEqual(self.queue.pop().url, req1.url)
+ self.assertEqual(len(self.queue), 2)
+ self.assertEqual(self.queue.pop().url, req2.url)
+ self.assertEqual(len(self.queue), 1)
+ self.assertEqual(self.queue.pop().url, req3.url)
+ self.assertEqual(len(self.queue), 0)
+ self.assertIsNone(self.queue.pop())
+
+ def test_no_peek_raises(self):
+ if hasattr(queuelib.queue.FifoMemoryQueue, "peek"):
+ raise unittest.SkipTest("queuelib.queue.FifoMemoryQueue.peek is defined")
+ self.queue.push(Request("https://example.org"))
+ with self.assertRaises(NotImplementedError, msg="The underlying queue class does not implement 'peek'"):
+ self.queue.peek()
+
+ def test_peek(self):
+ if not hasattr(queuelib.queue.FifoMemoryQueue, "peek"):
+ raise unittest.SkipTest("queuelib.queue.FifoMemoryQueue.peek is undefined")
+ self.assertEqual(len(self.queue), 0)
+ req1 = Request("https://example.org/1")
+ req2 = Request("https://example.org/2")
+ req3 = Request("https://example.org/3")
+ self.queue.push(req1)
+ self.queue.push(req2)
+ self.queue.push(req3)
+ self.assertEqual(len(self.queue), 3)
+ self.assertEqual(self.queue.peek().url, req1.url)
+ self.assertEqual(self.queue.pop().url, req1.url)
+ self.assertEqual(len(self.queue), 2)
+ self.assertEqual(self.queue.peek().url, req2.url)
+ self.assertEqual(self.queue.pop().url, req2.url)
+ self.assertEqual(len(self.queue), 1)
+ self.assertEqual(self.queue.peek().url, req3.url)
+ self.assertEqual(self.queue.pop().url, req3.url)
+ self.assertIsNone(self.queue.peek())
--- /dev/null
+++ b/tests/test_squeues_request.py
@@ -0,0 +1,214 @@
+import shutil
+import tempfile
+import unittest
+
+import queuelib
+
+from scrapy.squeues import (
+ PickleFifoDiskQueue,
+ PickleLifoDiskQueue,
+ MarshalFifoDiskQueue,
+ MarshalLifoDiskQueue,
+ FifoMemoryQueue,
+ LifoMemoryQueue,
+)
+from scrapy.http import Request
+from scrapy.spiders import Spider
+from scrapy.utils.test import get_crawler
+
+
+"""
+Queues that handle requests
+"""
+
+
+class BaseQueueTestCase(unittest.TestCase):
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp(prefix="scrapy-queue-tests-")
+ self.qpath = self.tempfilename()
+ self.qdir = self.mkdtemp()
+ self.crawler = get_crawler(Spider)
+
+ def tearDown(self):
+ shutil.rmtree(self.tmpdir)
+
+ def tempfilename(self):
+ with tempfile.NamedTemporaryFile(dir=self.tmpdir) as nf:
+ return nf.name
+
+ def mkdtemp(self):
+ return tempfile.mkdtemp(dir=self.tmpdir)
+
+
+class RequestQueueTestMixin:
+ def queue(self):
+ raise NotImplementedError()
+
+ def test_one_element_with_peek(self):
+ if not hasattr(queuelib.queue.FifoMemoryQueue, "peek"):
+ raise unittest.SkipTest("The queuelib queues do not define peek")
+ q = self.queue()
+ self.assertEqual(len(q), 0)
+ self.assertIsNone(q.peek())
+ self.assertIsNone(q.pop())
+ req = Request("http://www.example.com")
+ q.push(req)
+ self.assertEqual(len(q), 1)
+ self.assertEqual(q.peek().url, req.url)
+ self.assertEqual(q.pop().url, req.url)
+ self.assertEqual(len(q), 0)
+ self.assertIsNone(q.peek())
+ self.assertIsNone(q.pop())
+ q.close()
+
+ def test_one_element_without_peek(self):
+ if hasattr(queuelib.queue.FifoMemoryQueue, "peek"):
+ raise unittest.SkipTest("The queuelib queues define peek")
+ q = self.queue()
+ self.assertEqual(len(q), 0)
+ self.assertIsNone(q.pop())
+ req = Request("http://www.example.com")
+ q.push(req)
+ self.assertEqual(len(q), 1)
+ with self.assertRaises(NotImplementedError, msg="The underlying queue class does not implement 'peek'"):
+ q.peek()
+ self.assertEqual(q.pop().url, req.url)
+ self.assertEqual(len(q), 0)
+ self.assertIsNone(q.pop())
+ q.close()
+
+
+class FifoQueueMixin(RequestQueueTestMixin):
+ def test_fifo_with_peek(self):
+ if not hasattr(queuelib.queue.FifoMemoryQueue, "peek"):
+ raise unittest.SkipTest("The queuelib queues do not define peek")
+ q = self.queue()
+ self.assertEqual(len(q), 0)
+ self.assertIsNone(q.peek())
+ self.assertIsNone(q.pop())
+ req1 = Request("http://www.example.com/1")
+ req2 = Request("http://www.example.com/2")
+ req3 = Request("http://www.example.com/3")
+ q.push(req1)
+ q.push(req2)
+ q.push(req3)
+ self.assertEqual(len(q), 3)
+ self.assertEqual(q.peek().url, req1.url)
+ self.assertEqual(q.pop().url, req1.url)
+ self.assertEqual(len(q), 2)
+ self.assertEqual(q.peek().url, req2.url)
+ self.assertEqual(q.pop().url, req2.url)
+ self.assertEqual(len(q), 1)
+ self.assertEqual(q.peek().url, req3.url)
+ self.assertEqual(q.pop().url, req3.url)
+ self.assertEqual(len(q), 0)
+ self.assertIsNone(q.peek())
+ self.assertIsNone(q.pop())
+ q.close()
+
+ def test_fifo_without_peek(self):
+ if hasattr(queuelib.queue.FifoMemoryQueue, "peek"):
+ raise unittest.SkipTest("The queuelib queues do not define peek")
+ q = self.queue()
+ self.assertEqual(len(q), 0)
+ self.assertIsNone(q.pop())
+ req1 = Request("http://www.example.com/1")
+ req2 = Request("http://www.example.com/2")
+ req3 = Request("http://www.example.com/3")
+ q.push(req1)
+ q.push(req2)
+ q.push(req3)
+ with self.assertRaises(NotImplementedError, msg="The underlying queue class does not implement 'peek'"):
+ q.peek()
+ self.assertEqual(len(q), 3)
+ self.assertEqual(q.pop().url, req1.url)
+ self.assertEqual(len(q), 2)
+ self.assertEqual(q.pop().url, req2.url)
+ self.assertEqual(len(q), 1)
+ self.assertEqual(q.pop().url, req3.url)
+ self.assertEqual(len(q), 0)
+ self.assertIsNone(q.pop())
+ q.close()
+
+
+class LifoQueueMixin(RequestQueueTestMixin):
+ def test_lifo_with_peek(self):
+ if not hasattr(queuelib.queue.FifoMemoryQueue, "peek"):
+ raise unittest.SkipTest("The queuelib queues do not define peek")
+ q = self.queue()
+ self.assertEqual(len(q), 0)
+ self.assertIsNone(q.peek())
+ self.assertIsNone(q.pop())
+ req1 = Request("http://www.example.com/1")
+ req2 = Request("http://www.example.com/2")
+ req3 = Request("http://www.example.com/3")
+ q.push(req1)
+ q.push(req2)
+ q.push(req3)
+ self.assertEqual(len(q), 3)
+ self.assertEqual(q.peek().url, req3.url)
+ self.assertEqual(q.pop().url, req3.url)
+ self.assertEqual(len(q), 2)
+ self.assertEqual(q.peek().url, req2.url)
+ self.assertEqual(q.pop().url, req2.url)
+ self.assertEqual(len(q), 1)
+ self.assertEqual(q.peek().url, req1.url)
+ self.assertEqual(q.pop().url, req1.url)
+ self.assertEqual(len(q), 0)
+ self.assertIsNone(q.peek())
+ self.assertIsNone(q.pop())
+ q.close()
+
+ def test_lifo_without_peek(self):
+ if hasattr(queuelib.queue.FifoMemoryQueue, "peek"):
+ raise unittest.SkipTest("The queuelib queues do not define peek")
+ q = self.queue()
+ self.assertEqual(len(q), 0)
+ self.assertIsNone(q.pop())
+ req1 = Request("http://www.example.com/1")
+ req2 = Request("http://www.example.com/2")
+ req3 = Request("http://www.example.com/3")
+ q.push(req1)
+ q.push(req2)
+ q.push(req3)
+ with self.assertRaises(NotImplementedError, msg="The underlying queue class does not implement 'peek'"):
+ q.peek()
+ self.assertEqual(len(q), 3)
+ self.assertEqual(q.pop().url, req3.url)
+ self.assertEqual(len(q), 2)
+ self.assertEqual(q.pop().url, req2.url)
+ self.assertEqual(len(q), 1)
+ self.assertEqual(q.pop().url, req1.url)
+ self.assertEqual(len(q), 0)
+ self.assertIsNone(q.pop())
+ q.close()
+
+
+class PickleFifoDiskQueueRequestTest(FifoQueueMixin, BaseQueueTestCase):
+ def queue(self):
+ return PickleFifoDiskQueue.from_crawler(crawler=self.crawler, key="pickle/fifo")
+
+
+class PickleLifoDiskQueueRequestTest(LifoQueueMixin, BaseQueueTestCase):
+ def queue(self):
+ return PickleLifoDiskQueue.from_crawler(crawler=self.crawler, key="pickle/lifo")
+
+
+class MarshalFifoDiskQueueRequestTest(FifoQueueMixin, BaseQueueTestCase):
+ def queue(self):
+ return MarshalFifoDiskQueue.from_crawler(crawler=self.crawler, key="marshal/fifo")
+
+
+class MarshalLifoDiskQueueRequestTest(LifoQueueMixin, BaseQueueTestCase):
+ def queue(self):
+ return MarshalLifoDiskQueue.from_crawler(crawler=self.crawler, key="marshal/lifo")
+
+
+class FifoMemoryQueueRequestTest(FifoQueueMixin, BaseQueueTestCase):
+ def queue(self):
+ return FifoMemoryQueue.from_crawler(crawler=self.crawler)
+
+
+class LifoMemoryQueueRequestTest(LifoQueueMixin, BaseQueueTestCase):
+ def queue(self):
+ return LifoMemoryQueue.from_crawler(crawler=self.crawler)