转载自: 革新之路:重新设计Scrapy调度器,让爬虫速度翻倍
欢迎来到TheWeiJun的微信公众号!我是TheWeiJun,一个热衷于爬虫与逆向技术的博主。在本文,我们将一起探索如何重写Scrapy调度器源码,为您揭示爬虫背后的秘密。通过深入研究和实践,我将与您分享优化和扩展调度器的技巧,帮助您构建更高效、灵活的爬虫系统。让我们一起启程,开创爬虫技术的新篇章吧!
特别声明:本公众号文章只作为学术研究,不作为其他不法用途;如有侵权请联系作者删除。
立即加星标
每月看好文
目录
一、前言介绍 二、案例分享 三、源码分析 四、源码重构 五、思路总结
趣味模块
涛哥是一名爬虫工程师,他面临着队列积压的困扰,因为签名失效导致的问题。为了解决这个难题,他毅然决定穿越代码的迷宫,重构Scrapy调度器。涛哥凭借他的智慧和技术造诣,招兵买马,集结了一支强大的代码小分队。他们展开了一场精彩绝伦的冒险之旅,在代码的海洋中探寻解决方案。最终,涛哥带领小分队成功地打破了队列积压的魔咒,让签名失效的问题不再困扰他们。涛哥的故事激励着无数爬虫工程师,让他们相信,无论遇到多大的困难,只要勇往直前,就能找到通向胜利的代码路径。(故事情节虚构😂) 涛哥公众号如下:
一、前言介绍
在Scrapy框架中,调度器优先级是一个关键的概念。它决定了爬虫请求的发送顺序,使得爬虫能够按照一定的规则和优先级进行数据采集。调度器优先级的合理设置可以提高爬虫的效率和灵活性,使数据的采集更加有序和高效。
在本篇文章中,我们将探索Scrapy调度器优先级的原理和应用。我们将了解默认调度器优先级的设定,深入了解其内部机制,包括请求队列的排序和优先级算法。此外,我们还将讨论如何根据自身需求进行调度器优先级的自定义设置,以满足特定的爬虫任务需求。
更为重要的是,我们将引领您进入一个引人入胜的领域——重写Scrapy调度器。通过重新设计和优化调度器源码,我们可以实现更灵活、高效的爬虫调度策略,从而为我们的数据采集任务赋予更强大的能力。
让我们一起开启这段关于Scrapy调度器优先级的精彩之旅,探索如何通过重写调度器源码,提升爬虫性能和数据采集的质量!
二、案例分享
- 案例分享之前,我们先来科普一下:scrapy有几种调度方式?答案如下:
- FIFO(First-In-First-Out)调度:默认的调度方式,按照请求的先后顺序进行调度,即先进先出的原则。这种方式适用于大多数情况下的数据采集任务。
- LIFO(Last-In-First-Out)调度:按照请求的后来先处理的原则进行调度,即后进先出。通过设置DEPTH_PRIORITY为负值,可以启用LIFO调度。这种方式适用于需要优先处理最新请求的场景。
- 基于优先级的调度:通过为请求设置优先级字段,可以按照优先级进行调度。Scrapy提供了一个默认的优先级队列实现,根据优先级高低对请求进行排序。可以通过设置priority字段或使用Request()函数的priority参数来指定请求的优先级。
- 我们找到一个对时间戳加密生成的sign校验严格的网站,并编写代码如下:
# -\*- coding: utf-8 -\*-# -------------------------------# @author : 逆向与爬虫的故事# @time : 2023.06.23 00:24:25# -------------------------------import requests
headers = { 'accept': 'application/json', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8', 'cache-control': 'no-cache', 'content-type': 'application/json;charset=utf-8', # 'cookie': '\_ga=GA1.2.1512830416.1680156171; ifVisitOldVerBBS=false; CLASS\_CASTGC=1ba95109a985f60bb48cf85776b8a5d61a880c4b3b00352c32a9fba7617370dc21d897e28b0abd1ee97a8c3cb45a0dab578216bb9f7e71343a2d01d6eedad3fb9b5a877d1659651128822695fb70c98ce6bcee09a2ceab388ee25db71bbb42cfe3d1a187cba9aac0eac39a32639fb07b0e213dd3f131f8cb321d4b1ee707d9c8c71a897b689bd9921ed10b68ec0b3d09457a5ba2e7664884732e16abc520f006d678c55b3090aeb5439f03bf31e892ea38c9bd9209cdbbf4bb3ada03f22086c37dbf4b43734671639f59c713b9514879e85dc57f1fdbd5ca33d80b4a8cc5ae6e7e1b15cfbbc568c877f9845ec68b0a9a7626b4be3fdb442921d627c3caa16feb; JUTE\_BBS\_DATA=fdc78ce6fc7a2d05e60b273a3e143a0b464e75e18873971f3abc6099c1ce16455a39e05e101bd13ee19f654359e9f7928baa4de76dcf768645558cb8239c9ced47becc119e7f88a953f9f5d8dff6a6b2; \_\_utma=71015183.1512830416.1680156171.1680529726.1680529726.1; \_\_utmz=71015183.1680529726.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); \_\_auc=f2fc974f18747609d7ad4018f41; Hm\_lvt\_c51769f3a7d072905f2b2fab3c316d5a=1680156624,1680490506,1680580979; Hm\_lvt\_f09abeee20593b99a994cc5d57fc19fe=1680922843; Hm\_lvt\_8a6dad3652ee53a288a11ca184581908=1680529726,1680939956; sajssdk\_2015\_cross\_new\_user=1; sensorsdata2015jssdkcross=%7B%22distinct\_id%22%3A%22188e3edfbcc809-0684f39197ea164-1b525634-1296000-188e3edfbcd2765%22%2C%22first\_id%22%3A%22%22%2C%22props%22%3A%7B%22%24latest\_traffic\_source\_type%22%3A%22%E7%9B%B4%E6%8E%A5%E6%B5%81%E9%87%8F%22%2C%22%24latest\_search\_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC\_%E7%9B%B4%E6%8E%A5%E6%89%93%E5%BC%80%22%2C%22%24latest\_referrer%22%3A%22%22%7D%2C%22identities%22%3A%22eyIkaWRlbnRpdHlfY29va2llX2lkIjoiMTg4ZTNlZGZiY2M4MDktMDY4NGYzOTE5N2VhMTY0LTFiNTI1NjM0LTEyOTYwMDAtMTg4ZTNlZGZiY2QyNzY1In0%3D%22%2C%22history\_login\_id%22%3A%7B%22name%22%3A%22%22%2C%22value%22%3A%22%22%7D%2C%22%24device\_id%22%3A%22188e3edfbcc809-0684f39197ea164-1b525634-1296000-188e3edfbcd2765%22%7D; dxy\_da\_cookie-id=8c23c0ba128210b3d997bb23bc8113a61687451204937; JUTE\_SESSION\_ID=cb757a68-9b60-4877-ac15-96fc10b88aa6', 'pragma': 'no-cache', 'sec-ch-ua': '"Not.A/Brand";v="8", "Chromium";v="114", "Google Chrome";v="114"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'sentry-trace': '0f05186b4e4b4e76809573e16e837926-b100e9455b69591e-0', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10\_15\_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',}
params = { 'postId': '47212416', 'serverTimestamp': '1687451361781', 'timestamp': '1687451361942', 'noncestr': '29088263', 'sign': '550ec2b0ac50b3b7bffe89f89a5c1d9c7fdccf8e',}
response = requests.get('https://xxxx/xxxx/post/detail', params=params, headers=headers)print(response.text)
运行我们刚刚编辑好的代码后,response返回结果如下:
总结:我们根据response返回的message可看出签名已失效,也就是该网站对timestamp以及sign有很强的校验性,接下来我们还原sign加密后再次发包,看看那结果如何。
- 因为本文的重点是如何重写scrapy调度器,解决队列积压造成签名失效难题。所以我们直接跳过sign算法的逆向过程,sign算法如下:
def make\_url\_from\_params(self, url: str, params: Dict) -> str: server\_timestamp: int = int(time.time() \* 1000) timestamp: int = int(server\_timestamp) - random.randint(1000, 9999) nonce\_str: int = random.randint(10000000, 99999999) params['serverTimestamp'] = server\_timestamp params['timestamp'] = timestamp params['noncestr'] = nonce\_str params['appSignKey'] = self.salt sort\_list: List = sorted(params.items(), key=lambda x: x[0]) \_params: Dict = RequestWrapper.add\_sign(params, sort\_list) full\_url: str = RequestWrapper.url\_encode(url, \_params) return full\_url
- 我们对sign算法进行解密还原后,编辑完整代码如下,并再次进行发包测试:
# -\*- coding: utf-8 -\*-# -------------------------------# @author : 逆向与爬虫的故事# @time : 2023.06.23 00:24:25# -------------------------------import timeimport randomfrom typing import Dict, List
import requests
from utils.request\_action import RequestWrapper
def make\_url\_from\_params(url: str, params: Dict) -> str: server\_timestamp: int = int(time.time() \* 1000) timestamp: int = int(server\_timestamp) - random.randint(1000, 9999) nonce\_str: int = random.randint(10000000, 99999999) params['serverTimestamp'] = server\_timestamp params['timestamp'] = timestamp params['noncestr'] = nonce\_str params['appSignKey'] = "SgcQi7FgYdqrHqKB7aGqEZ4o7yssa2aEXoV3bQwh12FFgVNlpyYk2Yjm9d2EZGeGu3" sort\_list: List = sorted(params.items(), key=lambda x: x[0]) \_params: Dict = RequestWrapper.add\_sign(params, sort\_list) full\_url: str = RequestWrapper.url\_encode(url, \_params) return full\_url
headers = { 'accept': 'application/json', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8', 'cache-control': 'no-cache', 'content-type': 'application/json;charset=utf-8', # 'cookie': '\_ga=GA1.2.1512830416.1680156171; ifVisitOldVerBBS=false; CLASS\_CASTGC=1ba95109a985f60bb48cf85776b8a5d61a880c4b3b00352c32a9fba7617370dc21d897e28b0abd1ee97a8c3cb45a0dab578216bb9f7e71343a2d01d6eedad3fb9b5a877d1659651128822695fb70c98ce6bcee09a2ceab388ee25db71bbb42cfe3d1a187cba9aac0eac39a32639fb07b0e213dd3f131f8cb321d4b1ee707d9c8c71a897b689bd9921ed10b68ec0b3d09457a5ba2e7664884732e16abc520f006d678c55b3090aeb5439f03bf31e892ea38c9bd9209cdbbf4bb3ada03f22086c37dbf4b43734671639f59c713b9514879e85dc57f1fdbd5ca33d80b4a8cc5ae6e7e1b15cfbbc568c877f9845ec68b0a9a7626b4be3fdb442921d627c3caa16feb; JUTE\_BBS\_DATA=fdc78ce6fc7a2d05e60b273a3e143a0b464e75e18873971f3abc6099c1ce16455a39e05e101bd13ee19f654359e9f7928baa4de76dcf768645558cb8239c9ced47becc119e7f88a953f9f5d8dff6a6b2; \_\_utma=71015183.1512830416.1680156171.1680529726.1680529726.1; \_\_utmz=71015183.1680529726.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); \_\_auc=f2fc974f18747609d7ad4018f41; Hm\_lvt\_c51769f3a7d072905f2b2fab3c316d5a=1680156624,1680490506,1680580979; Hm\_lvt\_f09abeee20593b99a994cc5d57fc19fe=1680922843; Hm\_lvt\_8a6dad3652ee53a288a11ca184581908=1680529726,1680939956; sajssdk\_2015\_cross\_new\_user=1; sensorsdata2015jssdkcross=%7B%22distinct\_id%22%3A%22188e3edfbcc809-0684f39197ea164-1b525634-1296000-188e3edfbcd2765%22%2C%22first\_id%22%3A%22%22%2C%22props%22%3A%7B%22%24latest\_traffic\_source\_type%22%3A%22%E7%9B%B4%E6%8E%A5%E6%B5%81%E9%87%8F%22%2C%22%24latest\_search\_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC\_%E7%9B%B4%E6%8E%A5%E6%89%93%E5%BC%80%22%2C%22%24latest\_referrer%22%3A%22%22%7D%2C%22identities%22%3A%22eyIkaWRlbnRpdHlfY29va2llX2lkIjoiMTg4ZTNlZGZiY2M4MDktMDY4NGYzOTE5N2VhMTY0LTFiNTI1NjM0LTEyOTYwMDAtMTg4ZTNlZGZiY2QyNzY1In0%3D%22%2C%22history\_login\_id%22%3A%7B%22name%22%3A%22%22%2C%22value%22%3A%22%22%7D%2C%22%24device\_id%22%3A%22188e3edfbcc809-0684f39197ea164-1b525634-1296000-188e3edfbcd2765%22%7D; dxy\_da\_cookie-id=8c23c0ba128210b3d997bb23bc8113a61687451204937; JUTE\_SESSION\_ID=cb757a68-9b60-4877-ac15-96fc10b88aa6', 'pragma': 'no-cache', 'sec-ch-ua': '"Not.A/Brand";v="8", "Chromium";v="114", "Google Chrome";v="114"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'sentry-trace': '0f05186b4e4b4e76809573e16e837926-b100e9455b69591e-0', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10\_15\_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',}
params = { 'postId': '47212416', # 'serverTimestamp': '1687451361781', # 'timestamp': '1687451361942', # 'noncestr': '29088263', # 'sign': '550ec2b0ac50b3b7bffe89f89a5c1d9c7fdccf8e',}full\_url = make\_url\_from\_params('https://xxxxx/xxx/post/detail', params)response = requests.get(full\_url, params=params, headers=headers)print(response.text)
总结:我们查看打印输出,结果非常完美。此刻,证明我们sign算法还原是没有问题的,接下来我们把requests代码改造成scrapy代码,利用scrapy的高并发性能完成快速采集。在这个过程中因为请求过多,产生了队列积压,从而引发了签名失效问题。紧接着我们使用了scrapy三种调度方式去解决该问题,发现不管使用哪种方式都无法解决队列积压造成的签名失效问题。于是作者想到以下两种方式进行解决:
A、控制队列长度,在合理的时间戳范围内,只要时间戳不过期,就能通过验证。
B、重写scrapy调度器源码,请求在入队列时,我们先不计算签名,发包时再计算。
- 实战分析,通过A方法进行测试,编写一个中间件类来监控队列的长度,并在达到一定阈值时暂停发送请求,完整代码如下:
from scrapy.exceptions import NotConfigured
class QueueLengthMiddleware: def \_\_init\_\_(self, queue\_threshold): self.queue\_threshold = queue\_threshold
@classmethod def from\_crawler(cls, crawler): # 从配置中获取队列阈值 queue\_threshold = crawler.settings.getint('QUEUE\_THRESHOLD', 100) # 如果未设置阈值,则抛出异常 if not queue\_threshold: raise NotConfigured return cls(queue\_threshold)
def process\_spider\_input(self, response, spider): # 检查队列长度 if len(spider.crawler.engine.slot.scheduler) >= self.queue\_threshold: # 队列长度超过阈值,暂停发送请求 spider.crawler.engine.pause() return None
def process\_spider\_output(self, response, result, spider): # 检查队列长度 if len(spider.crawler.engine.slot.scheduler) < self.queue\_threshold: # 队列长度低于阈值,继续发送请求 spider.crawler.engine.unpause() return result
总结:代码启动后,发现签名过期问题确实是改善了。但是因为我们控制了队列长度,相当于控制了并发速度,并没有发挥scrapy的高并发性。很显然,我们不应该采用这样的方式去解决问题。那么接下来,我们需要去阅读scrapy源码并完成对scrapy调度等组件源码改造,解决我们所遇到的难题吧。
三、源码分析
- 我们分析scrapy源码,完整的源码分析可参考之前的文章:Scrapy源码分析之Scheduler模块(第六期)。公众号主页也有scrapy源码章节,大家可自行查看。本环节,我们只分析重要代码,代码如下:
import importlibimport six
from scrapy.utils.misc import load\_object
from . import connection, defaults
# TODO: add SCRAPY\_JOB support.class Scheduler(object): """Redis-based scheduler
Settings -------- SCHEDULER\_PERSIST : bool (default: False) Whether to persist or clear redis queue. SCHEDULER\_FLUSH\_ON\_START : bool (default: False) Whether to flush redis queue on start. SCHEDULER\_IDLE\_BEFORE\_CLOSE : int (default: 0) How many seconds to wait before closing if no message is received. SCHEDULER\_QUEUE\_KEY : str Scheduler redis key. SCHEDULER\_QUEUE\_CLASS : str Scheduler queue class. SCHEDULER\_DUPEFILTER\_KEY : str Scheduler dupefilter redis key. SCHEDULER\_DUPEFILTER\_CLASS : str Scheduler dupefilter class. SCHEDULER\_SERIALIZER : str Scheduler serializer.
"""
def \_\_init\_\_(self, server, persist=False, flush\_on\_start=False, queue\_key=defaults.SCHEDULER\_QUEUE\_KEY, queue\_cls=defaults.SCHEDULER\_QUEUE\_CLASS, dupefilter\_key=defaults.SCHEDULER\_DUPEFILTER\_KEY, dupefilter\_cls=defaults.SCHEDULER\_DUPEFILTER\_CLASS, idle\_before\_close=0, serializer=None): """Initialize scheduler.
Parameters ---------- server : Redis The redis server instance. persist : bool Whether to flush requests when closing. Default is False. flush\_on\_start : bool Whether to flush requests on start. Default is False. queue\_key : str Requests queue key. queue\_cls : str Importable path to the queue class. dupefilter\_key : str Duplicates filter key. dupefilter\_cls : str Importable path to the dupefilter class. idle\_before\_close : int Timeout before giving up.
""" if idle\_before\_close < 0: raise TypeError("idle\_before\_close cannot be negative")
self.server = server self.persist = persist self.flush\_on\_start = flush\_on\_start self.queue\_key = queue\_key self.queue\_cls = queue\_cls self.dupefilter\_cls = dupefilter\_cls self.dupefilter\_key = dupefilter\_key self.idle\_before\_close = idle\_before\_close self.serializer = serializer self.stats = None
def \_\_len\_\_(self): return len(self.queue)
@classmethod def from\_settings(cls, settings): kwargs = { 'persist': settings.getbool('SCHEDULER\_PERSIST'), 'flush\_on\_start': settings.getbool('SCHEDULER\_FLUSH\_ON\_START'), 'idle\_before\_close': settings.getint('SCHEDULER\_IDLE\_BEFORE\_CLOSE'), }
# If these values are missing, it means we want to use the defaults. optional = { # TODO: Use custom prefixes for this settings to note that are # specific to scrapy-redis. 'queue\_key': 'SCHEDULER\_QUEUE\_KEY', 'queue\_cls': 'SCHEDULER\_QUEUE\_CLASS', 'dupefilter\_key': 'SCHEDULER\_DUPEFILTER\_KEY', # We use the default setting name to keep compatibility. 'dupefilter\_cls': 'DUPEFILTER\_CLASS', 'serializer': 'SCHEDULER\_SERIALIZER', } for name, setting\_name in optional.items(): val = settings.get(setting\_name) if val: kwargs[name] = val
# Support serializer as a path to a module. if isinstance(kwargs.get('serializer'), six.string\_types): kwargs['serializer'] = importlib.import\_module(kwargs['serializer'])
server = connection.from\_settings(settings) # Ensure the connection is working. server.ping()
return cls(server=server, \*\*kwargs)
@classmethod def from\_crawler(cls, crawler): instance = cls.from\_settings(crawler.settings) # FIXME: for now, stats are only supported from this constructor instance.stats = crawler.stats return instance
def open(self, spider): self.spider = spider
try: self.queue = load\_object(self.queue\_cls)( server=self.server, spider=spider, key=self.queue\_key % {'spider': spider.name}, serializer=self.serializer, ) except TypeError as e: raise ValueError("Failed to instantiate queue class '%s': %s", self.queue\_cls, e)
self.df = load\_object(self.dupefilter\_cls).from\_spider(spider)
if self.flush\_on\_start: self.flush() # notice if there are requests already in the queue to resume the crawl if len(self.queue): spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
def close(self, reason): if not self.persist: self.flush()
def flush(self): self.df.clear() self.queue.clear()
def enqueue\_request(self, request): if not request.dont\_filter and self.df.request\_seen(request): self.df.log(request, self.spider) return False if self.stats: self.stats.inc\_value('scheduler/enqueued/redis', spider=self.spider) self.queue.push(request) return True
def next\_request(self): block\_pop\_timeout = self.idle\_before\_close request = self.queue.pop(block\_pop\_timeout) if request and self.stats: self.stats.inc\_value('scheduler/dequeued/redis', spider=self.spider) return request
def has\_pending\_requests(self): return len(self) > 0
总结:上述代码是Scrapy Redis调度器(Scheduler)的一部分,用于将请求添加到队列中并从队列中获取下一个请求。之后我们会重写该方法,接下来我们分析下engine模块。
- 我们分析engine源码,主要是为了验证engine是如何通过scheduler去调度请求进行任务下载的,相关代码如下:
@inlineCallbacksdef open\_spider(self, spider: Spider, start\_requests: Iterable = (), close\_if\_idle: bool = True): if self.slot is not None: raise RuntimeError(f"No free spider slot when opening {spider.name!r}") logger.info("Spider opened", extra={'spider': spider}) nextcall = CallLaterOnce(self.\_next\_request) scheduler = create\_instance(self.scheduler\_cls, settings=None, crawler=self.crawler) start\_requests = yield self.scraper.spidermw.process\_start\_requests(start\_requests, spider) self.slot = Slot(start\_requests, close\_if\_idle, nextcall, scheduler) self.spider = spider if hasattr(scheduler, "open"): yield scheduler.open(spider) yield self.scraper.open\_spider(spider) self.crawler.stats.open\_spider(spider) yield self.signals.send\_catch\_log\_deferred(signals.spider\_opened, spider=spider) self.slot.nextcall.schedule() self.slot.heartbeat.start(5) def \_next\_request(self) -> None: assert self.slot is not None # typing assert self.spider is not None # typing
if self.paused: return None
while not self.\_needs\_backout() and self.\_next\_request\_from\_scheduler() is not None: pass
if self.slot.start\_requests is not None and not self.\_needs\_backout(): try: request = next(self.slot.start\_requests) except StopIteration: self.slot.start\_requests = None except Exception: self.slot.start\_requests = None logger.error('Error while obtaining start requests', exc\_info=True, extra={'spider': self.spider}) else: self.crawl(request)
if self.spider\_is\_idle() and self.slot.close\_if\_idle: self.\_spider\_idle()
总结:函数太多,只粘贴关键部分函数说明。engine模块中open_spider、_next_request方法中执行了scheduler方法,用于调度处理磁盘、内存中的队列请求,并且加载了settings模块中SCHEDULER类,该类是可以重写自定义的。
启发:阅读完scrapy调度器(scheduler)、引擎(engine)源码后,我得到了启发。我们可以在不改变scrapy自有调度器、引擎组件自有方法下对源码进行重构。让scrapy请求在入队列的时候不对时间戳、签名进行计算,在pop到该请求时再进行相关参数计算生成,接下来我们进入源码重写环节!
四、源码重构
1. 我们对http模块进行重构,重写后完整代码如下 (相关代码已经已提交scrapy社区合并) :
"""This module implements the Request class which is used to represent HTTPrequests in Scrapy.
See documentation in docs/topics/request-response.rst"""import inspectfrom typing import Callable, List, Optional, Tuple, Type, TypeVar, Union, Dict
from scrapy import Requestfrom w3lib.url import safe\_url\_string
import scrapyfrom scrapy.http.common import obsolete\_setterfrom scrapy.http.headers import Headersfrom scrapy.utils.curl import curl\_to\_request\_kwargsfrom scrapy.utils.python import to\_bytesfrom scrapy.utils.url import escape\_ajax
RequestTypeVar = TypeVar("RequestTypeVar", bound="Request")
class MsRequest(Request): """MsRequest """
attributes: Tuple[str, ...] = ( "url", "callback", "method", "headers", "body", "cookies", "meta", "encoding", "priority", "dont\_filter", "errback", "flags", "cb\_kwargs", "params", "urlback", ) """A tuple of :class:`str` objects containing the name of all public attributes of the class that are also keyword parameters of the ``\_\_init\_\_`` method.
Currently used by :meth:`Request.replace`, :meth:`Request.to\_dict` and :func:`~scrapy.utils.request.request\_from\_dict`. """
def \_\_init\_\_( self, url: str, callback: Optional[Callable] = None, method: str = "GET", headers: Optional[dict] = None, params: Optional[Dict] = None, body: Optional[Union[bytes, str]] = None, cookies: Optional[Union[dict, List[dict]]] = None, meta: Optional[dict] = None, encoding: str = "utf-8", priority: int = 0, dont\_filter: bool = False, errback: Optional[Callable] = None, urlback: Optional[Callable] = None, flags: Optional[List[str]] = None, cb\_kwargs: Optional[dict] = None, ) -> None: self.\_encoding = encoding # this one has to be set first self.method = str(method).upper() if not isinstance(priority, int): raise TypeError(f"Request priority not an integer: {priority!r}") self.priority = priority if callback is not None and not callable(callback): raise TypeError(f'callback must be a callable, got {type(callback).\_\_name\_\_}') if errback is not None and not callable(errback): raise TypeError(f'errback must be a callable, got {type(errback).\_\_name\_\_}') if urlback is not None and not callable(urlback): raise TypeError(f'urlback must be a callable, got {type(urlback).\_\_name\_\_}') self.callback = callback self.errback = errback self.urlback = urlback
self.cookies = cookies or {} self.headers = Headers(headers or {}, encoding=encoding) self.params = params self.base\_url = url self.dont\_filter = dont\_filter self.\_set\_url(url) self.\_set\_body(body) self.\_meta = dict(meta) if meta else None self.\_cb\_kwargs = dict(cb\_kwargs) if cb\_kwargs else None self.flags = [] if flags is None else list(flags)
@property def cb\_kwargs(self) -> dict: if self.\_cb\_kwargs is None: self.\_cb\_kwargs = {} return self.\_cb\_kwargs
@property def meta(self) -> dict: if self.\_meta is None: self.\_meta = {} return self.\_meta
def \_get\_url(self) -> str: return self.\_url
def \_set\_url(self, url: str) -> None: if not isinstance(url, str): raise TypeError(f"Request url must be str, got {type(url).\_\_name\_\_}")
s = safe\_url\_string(url, self.encoding) self.\_url = escape\_ajax(s)
url = property(\_get\_url, obsolete\_setter(\_set\_url, 'url'))
def \_get\_body(self) -> bytes: return self.\_body
def \_set\_body(self, body: Optional[Union[str, bytes]]) -> None: self.\_body = b"" if body is None else to\_bytes(body, self.encoding)
body = property(\_get\_body, obsolete\_setter(\_set\_body, 'body'))
@property def encoding(self) -> str: return self.\_encoding
def \_\_str\_\_(self) -> str: return f"<{self.method} {self.url}>"
\_\_repr\_\_ = \_\_str\_\_
def copy(self) -> "Request": return self.replace()
def replace(self, \*args, \*\*kwargs) -> "Request": """Create a new Request with the same attributes except for those given new values""" for x in self.attributes: kwargs.setdefault(x, getattr(self, x)) cls = kwargs.pop('cls', self.\_\_class\_\_) return cls(\*args, \*\*kwargs)
def make\_url\_from\_params(self): url: str = self.urlback(self.base\_url, self.params) self.\_set\_url(url)
@classmethod def from\_curl( cls: Type[RequestTypeVar], curl\_command: str, ignore\_unknown\_options: bool = True, \*\*kwargs ) -> RequestTypeVar: """Create a Request object from a string containing a `cURL <https://curl.haxx.se/>`\_ command. It populates the HTTP method, the URL, the headers, the cookies and the body. It accepts the same arguments as the :class:`Request` class, taking preference and overriding the values of the same arguments contained in the cURL command.
Unrecognized options are ignored by default. To raise an error when finding unknown options call this method by passing ``ignore\_unknown\_options=False``.
.. caution:: Using :meth:`from\_curl` from :class:`~scrapy.http.Request` subclasses, such as :class:`~scrapy.http.JSONRequest`, or :class:`~scrapy.http.XmlRpcRequest`, as well as having :ref:`downloader middlewares <topics-downloader-middleware>` and :ref:`spider middlewares <topics-spider-middleware>` enabled, such as :class:`~scrapy.downloadermiddlewares.defaultheaders.DefaultHeadersMiddleware`, :class:`~scrapy.downloadermiddlewares.useragent.UserAgentMiddleware`, or :class:`~scrapy.downloadermiddlewares.httpcompression.HttpCompressionMiddleware`, may modify the :class:`~scrapy.http.Request` object.
To translate a cURL command into a Scrapy request, you may use `curl2scrapy <https://michael-shub.github.io/curl2scrapy/>`\_. """ request\_kwargs = curl\_to\_request\_kwargs(curl\_command, ignore\_unknown\_options) request\_kwargs.update(kwargs) return cls(\*\*request\_kwargs)
def to\_dict(self, \*, spider: Optional["scrapy.Spider"] = None) -> dict: """Return a dictionary containing the Request's data.
Use :func:`~scrapy.utils.request.request\_from\_dict` to convert back into a :class:`~scrapy.Request` object.
If a spider is given, this method will try to find out the name of the spider methods used as callback and errback and include them in the output dict, raising an exception if they cannot be found. """ d = { "url": self.url, # urls are safe (safe\_string\_url) "callback": \_find\_method(spider, self.callback) if callable(self.callback) else self.callback, "errback": \_find\_method(spider, self.errback) if callable(self.errback) else self.errback, "urlback": \_find\_method(spider, self.urlback) if callable(self.urlback) else self.urlback, "headers": dict(self.headers), } for attr in self.attributes: d.setdefault(attr, getattr(self, attr)) if type(self) is not Request: d["\_class"] = self.\_\_module\_\_ + '.' + self.\_\_class\_\_.\_\_name\_\_ return d
def \_find\_method(obj, func): """Helper function for Request.to\_dict""" # Only instance methods contain ``\_\_func\_\_`` if obj and hasattr(func, '\_\_func\_\_'): members = inspect.getmembers(obj, predicate=inspect.ismethod) for name, obj\_func in members: # We need to use \_\_func\_\_ to access the original function object because instance # method objects are generated each time attribute is retrieved from instance. # # Reference: The standard type hierarchy # https://docs.python.org/3/reference/datamodel.html if obj\_func.\_\_func\_\_ is func.\_\_func\_\_: return name raise ValueError(f"Function {func} is not an instance method in: {obj}")
def process\_request(request\_func): """request decorator""" def check\_request(self, \*args, \*\*kwargs): request = request\_func(self, \*args, \*\*kwargs) if request and callable(request.urlback): request.make\_url\_from\_params() return request
return check\_request
总结:重写http模块,让http模块MsRequest类拥有urlback、params属性,并实现一个process_request方法。为之后Scheduler重写后的模块进行装饰器使用。接下来,我们进入重写Scheduler环节。
- 我们对scheduler源码进行重写,重写后完整代码如下:
from scrapy\_redis.scheduler import Scheduler
from http.request import process\_request
class CustomScheduler(Scheduler):
def enqueue\_request(self, request): if not request.dont\_filter and self.df.request\_seen(request): self.df.log(request, self.spider) return False if self.stats: self.stats.inc\_value('scheduler/enqueued/redis', spider=self.spider) self.queue.push(request) return True
@process\_request def next\_request(self): block\_pop\_timeout = self.idle\_before\_close request = self.queue.pop(block\_pop\_timeout) if request and self.stats: self.stats.inc\_value('scheduler/dequeued/redis', spider=self.spider) return request
- 接下来我们只需要最后一步,即可完成对整个流程的贯通,截图如下所示:
总结:这里我说下完整流程,我们对http模块进行Request继承重写,实现了我们自定义的params、urlback方法。我们之所以这么设计,就是与前面提到的启发环节进行关联,在scheduler 从队列中pop出请求后,再对该请求进行签名计算。这样我们就能保证,每个请求都是实时的,而不用担心队列积压造成的签名失效等问题。经过作者验证,该方法对request使用vps代理池也同样生效,这样就不用担心队列积压造成的代理过期、签名失败问题了。
- 整个代码实现后,我们再次启动scrapy,终于得到了满意的结果。附上部分日志截图,如下:
五、思路总结
回顾整个分析流程,本次重点主要概括为以下几点:
- 熟悉scrapy调度器引擎模块
- 熟悉scrapy调度器多种方式
- 熟悉scrapy运行工作原理
- 能够对源码进行重构实现
- 引入时间戳机制实现实时调度
相关代码作者已经提交Scrapy社区,同时作者也已经进行开源分享💕
本篇分享到这里就结束了,欢迎大家关注下期,我们不见不散☀️☀️✌️
往期推荐
猿人学逆向比赛第四题-gRPC题解 | Go版本DX滑块验证码别乱捅!一不小心就反爬了。某安网别逆向,一不小心就......微信自动聊天机器狗,配置chatGPT,比Siri还智能!
被魔改md5加密坑了?某网站魔改md5加密逆向还原 (多种语言还原)
如果想要获得更多精彩内容可以关注我朋友:
*END*
作者简介
我是TheWeiJun,有着执着的追求,信奉终身成长,不定义自己,热爱技术但不拘泥于技术,爱好分享,喜欢读书和乐于结交朋友,欢迎扫我微信与我交朋友💕 分享日常学习中关于爬虫及逆向分析的一些思路,文中若有错误的地方,欢迎大家多多交流指正💕
点分享
点收藏
点点赞
点在看