爬虫进阶:框架功能升级之断点续爬 | 爬虫 |《python学习之路》| python 技术论坛-江南app体育官方入口
断点续爬
断点续爬设计分析
断点续爬设计原理介绍:
断点续爬的效果:爬虫程序中止后,再次启动,对已经发起的请求不再发起,而是直接从之前的队列中获取请求继续执行。这也就意味着需要实现以下两点:
1.去重标识(历史请求的指纹)持久化存储,使得新的请求可以和以前的请求进行去重对比
2.请求队列也需要持久化存储
其实也就是程序的中止不会造成请求队列和去重容器的消失,再次启动程序后,还能继续访问它们。
断点续爬无丢失方案的实现
断点续爬无丢失的代码实现:
- 添加备份容器:利用redis的hash类型类对每一个请求对象进行存储
- 为request对象设置重试次数属性
- 在调度器的get_request方法中实现响应的逻辑判断
- 实现delete_request方法:从备份中删除对应的reqeust对象
- 实现add_lost_request方法
- 在引擎中调用这些方法,完成断点续爬无丢失需求
# scrapy_plus/redis_hash.py
'''实现一个对redis哈希类型的操作封装'''
import redis
import pickle
from scrapy_plus.http.request import request
from scrapy_plus.conf import settings
class redisbackuprequest(object):
'''利用hash类型,存储每一个请求对象,key是指纹,值就是请求对象'''
redis_backup_name = settings.redis_backup_name
redis_backup_host = settings.redis_backup_host
redis_backup_port = settings.redis_backup_port
redis_backup_db = settings.redis_backup_db
def __init__(self):
self._redis = redis.strictredis(host=self.redis_backup_host, port=self.redis_backup_port ,db=self.redis_backup_db)
self._name = self.redis_backup_name
# 增删改查
def save_request(self, fp, request):
'''将请求对象备份到redis的hash中'''
bytes_data = pickle.dumps(request)
self._redis.hset(self._name, fp, bytes_data)
def delete_request(self, fp):
'''根据请求的指纹,将其删除'''
self._redis.hdel(self._name, fp)
def update_request(self, fp, request):
'''更新已有的fp'''
self.save_request(fp, request)
def get_requests(self):
'''返回全部的请求对象'''
for fp, bytes_request in self._redis.hscan_iter(self._name):
request = pickle.loads(bytes_request)
yield request
为request对象增加重试次数属性:
class request(object): '''框架内置请求对象,设置请求信息''' def __init__(self, url, method='get', headers=none, params=none, data=none, parse='parse', filter=true, meta=none): self.url = url # 请求地址 self.method = method # 请求方法 self.headers = headers # 请求头 self.params = params # 请求参数 self.data = data # 请求体 self.parse = parse # 指明它的解析函数, 默认是parse方法 self.filter = filter # 是否进行去重,默认是true self.retry_time = 0 # 重试次数 self.meta = meta
修改调度器,实现对应的逻辑以及方法:
# scrapy_plus/core/scheduler.py ...... from scrapy_plus.redis_hash import redisbackuprequest ...... class scheduler(object): ''' 缓存请求对象(request),并为下载器提供请求对象,实现请求的调度 对请求对象进行去重判断 ''' def __init__(self,collector): if scheduler_persist: #如果使用分布式或者是持久化,使用redis的队列 self.queue = reidsqueue() self._filter_container = redisfiltercontainer() else: self.queue = queue() self._filter_container = noramlfiltercontainer() self.collector = collector def add_reqeust(self, request): '''存储request对象进入队列 return: none ''' # 先判断是否要去重 if request.filter is false: self.queue.put(request) logger.info("添加请求成功
[%s %s]" % (request.method, request.url)) self.total_request_number = 1 # 统计请求总数 return # 必须return # 判断去重,如果重复,就不添加,否则才添加 fp = self._gen_fp(request) if not self.filter_request(fp, request): # 往队列添加请求 logger.info("添加请求成功[%s %s]"%(request.method.upper(), request.url)) self.queue.put(request) if settings.role in ['master', 'slave']: self._backup_request.save_request(fp, request) # 对请求进行备份 # 如果是新的请求,那么就添加进去重容器,表示请求已经添加到了队列中 self._filter_container.add_fp(fp) self.total_request_number = 1 else: self.repeat_request_number = 1 def get_request(self): '''从队列取出一个请求对象 return: request object ''' try: request = self.queue.get(false) except: return none else: if request.filter is true and settings.role in ['master', 'slave']: # 先判断 是否需要进行去重 # 判断重试次数是否超过规定 fp = self._gen_fp(request) if request.retry_time >= settings.max_retry_times: self._backup_request.delete_request(fp) # 如果超过,那么直接删除 logger.warnning("出现异常请求,且超过最大尝试的次数:[%s]%s"%(request.method, request.url)) request.retry_time = 1 # 重试次数1 self._backup_request.update_request(fp, request) # 并更新到备份中 return request def delete_request(self, request): '''根据请求从备份删除对应的请求对象''' if settings.role in ['master', 'slave']: fp = self._gen_fp(request) self._backup_request.delete_request(fp) def add_lost_reqeusts(self): '''将丢失的请求对象再添加到队列中''' # 从备份容器取出来,放到队列中 if settings.role in ['master', 'slave']: for request in self._backup_request.get_requests(): self.queue.put(request) ......