如何礼貌地抓取和分析500 mm图像

2020-08-18 06:07:42

CC搜索的目标是为互联网上所有的知识共享作品建立索引,从图片开始。我们已经索引了超过5亿张图片,我们认为,根据我们的最新统计,这大约占互联网上所有CC许可内容的36%。为了进一步增强我们搜索工具的实用性,我们最近开始爬行和分析图像,以改进搜索结果。本文将通过一些理想化的代码片段和图表,讨论一个大型爬行器的纸质设计、实现和投入生产的过程。完整的源代码可以在GitHub上查看。

最初,当我们发现一张图片并将其插入CC搜索时,我们甚至没有费心下载它;我们将URL插入我们的数据库中,并将该图片嵌入到我们的搜索结果中。这种方法有很多问题:

我们不知道图像的尺寸或压缩质量,这对于相关性目的(将低质量图像降级)和过滤都很有用。例如,一些用户只对高分辨率图像感兴趣,并且想要排除特定大小以下的内容。

我们不能对任何图像运行任何类型的计算机视觉分析,这对于通过对象识别丰富搜索元数据很有用。

嵌入第三方内容充满了问题。如果另一方的服务器宕机,图像因链接损坏而消失,或者它们的TLS证书过期怎么办?这些情况中的每一种都会导致搜索结果中出现损坏的图像或浏览器提示安全性降低。

我们解决了(3)通过在搜索结果中的图像和它们的第三方来源之间设置缓存缩略图代理,以及一些最后时刻的活跃性检查,以确保图像没有404和404;d。

然而,如果不实际下载图像并对文件内容执行一些分析,则不可能解决(1)和(2)。为了重现用户在图像搜索中习以为常的功能,我们需要一个功能相当强大的爬行系统。

在几千张图片的规模上,很容易拼凑出几个脚本来吐出这些信息,但对于5亿张图片来说,要克服很多障碍。

我们想要有礼貌地爬行;然而,图像的集中度和数量意味着我们必须以高爬行率命中一些来源,才有希望在合理的时间内完成爬行。我们的数据来源从只有一名IT人员的非营利性博物馆到拥有自己的数据中心和数千名员工的科技公司;爬行速度必须量身定做,以便从大公司快速下载,但不会压倒小来源。同时,我们需要确保我们没有高估任何来源的容量,并注意我们的爬行器正在使服务器紧张的迹象。

为了在合理的时间内完成爬行和分析任务,我们需要将处理每一幅图像的时间保持在尽可能短的时间内。这意味着爬行和分析任务需要并行分布到多台机器上。

这个爬虫会产生很多元数据。将其与我们的内部系统集成的步骤不需要阻止调整大小的任务。这表明,在将消息写入我们的数据层之前,需要使用消息总线来缓冲消息,因为在数据层中写入可能非常昂贵。

我们希望以每个源的错误计数、状态代码和爬行率摘要的形式对爬网的进展情况有一个基本的了解。

总而言之,这里的挑战与其说是制作一个真正快速的爬虫,不如说是为每个来源量身定做爬行速度。至少,我们需要处理并发性和并行性、配置和管理爬行器基础设施的生命周期、捕获输出数据的管道、监视爬网进度的方法、确保系统按预期运行的一套测试,以及实施礼貌策略的可靠方法。这不是一个微不足道的项目,特别是对于我们这个小小的三人技术团队(其中只有一个人可以做所有的爬行工作)。难道我们不能只使用现成的开源爬虫吗?

任何正派的软件工程师在投入项目和重新发明轮子之前,都会考虑现有的选择。我的评估是,虽然有很多开源爬行框架可用,但很少有专注于图像的,有些没有积极维护,所有这些都需要大量的定制来满足我们的爬行策略的要求。此外,许多解决方案比我们的用例要求更复杂,会显著扩展我们对云基础设施的使用,从而导致更高的费用和更令人头疼的运营问题。我使用Apache Nutch、Scrapy Cluster和Frontera进行了试验;现有的选项看起来都不太适合我们的用例。

提醒一下,我们希望最终能爬行互联网上的每一个知识共享项目。有效的爬行是我们的搜索引擎能够提供的功能的核心。除了是实现高质量图像搜索的中心,爬行对于在任何网站上发现任何类型的新的知识共享内容也是有用的。在我看来,这是花一些时间设计自定义爬行解决方案的有力论据,只要功能集的范围有限,我们就可以完全控制该过程。在下一节中,我们将评估从头开始构建爬行器所需的努力。

我们知道,使用一台虚拟机和一个IP地址无法抓取5亿张图像,因此从一开始就很明显,我们需要一种将抓取和分析任务分布到多台机器的方法。一个基本的队列工作者架构将在这里完成这项工作;当我们想要爬行图像时,我们可以将URL分派到入站图像队列,然后一个工作者最终弹出该任务并对其进行处理。卡夫卡将处理所有在工人之间划分和分配任务的艰苦工作。

工作进程执行图像的实际分析,这基本上需要下载图像、提取感兴趣的属性,并将生成的元数据粘贴回Kafka主题中,以便稍后进行后续处理。工作人员还必须包括一些仪器,以符合速率限制和错误报告。

我们还知道,我们需要在工作进程之间共享一些有关爬网进度的信息,例如我们是否已经超过了网站的禁止速率限制、我们在最后一分钟看到状态代码的次数、到目前为止我们已经处理了多少图像,依此类推。因为我们只对共享应用程序状态和聚合统计信息感兴趣,所以像Redis这样的轻量级键/值存储似乎非常合适。

最后,我们需要一个集中控制爬行的监督过程。这一关键的管理过程将负责通过调整每个源的爬行率、在出现错误时采取行动以及向爬行器操作员报告统计数据来确保我们的爬行器工作人员行为正常。我们将此进程称为爬网监视器。

在更高的层面上,构建快速爬虫的问题对于我们的团队来说似乎是可以解决的,即使是在数亿张图像的规模上也是如此。如果我们能保持每秒200张图片的抓取和分析速度,我们可以在大约一个月的时间里抓取所有5亿张图片。

在下一节中,我们将检查组成爬虫程序的一些关键组件。

爬行是一项受大量IO限制的任务。工作人员需要与Kafka和Redis等内部系统以及持有目标图像的第三方网站保持大量同时打开的连接。一旦我们将图像存储在内存中,执行实际的分析任务就变得既简单又便宜。由于这些原因,异步方法似乎比使用多线程执行更有吸引力。即使我们的图像处理任务变得越来越复杂,并且变得受限于CPU,我们也可以通过将重量级任务卸载到进程池来两全其美。有关更多详细信息,请参阅异步文档中的运行阻塞代码。

异步方法可能是可取的另一个原因是,我们有几个需要实时响应事件的互锁组件:我们的爬网监控进程需要同时控制速率限制进程,如果检测到错误还需要中断爬网,而我们的工作进程需要使用爬网事件、处理图像、上传缩略图,并生成记录每个图像的元数据的事件。通过进程间通信协调所有这些组件可能很困难,但将任务分解成小块并屈服于事件循环相对容易。

这是我们的爬行系统中最重要的部分:实际执行图像获取和处理工作的部分。如前所述,我们需要并发执行此任务,因此需要使用Async/Await语法定义所有内容,以允许事件循环执行多任务。在其他方面,实际任务本身是简单的。

通常,在设计高并发软件时,目标是最大化吞吐量并将服务器推向其绝对极限。网络爬虫的情况正好相反,特别是当你经营一个完全依赖他人善意存在的非营利性组织时。我们希望尽可能合理地确定,我们不会因为意外的DDoS而将资源从互联网上删除。与此同时,我们需要尽可能快地爬行,以应对资源充足的来源,以承受沉重的爬行,否则我们将永远不会完成爬行。我们如何才能使我们的爬行率与网站的能力相匹配呢?

最初,我的计划是通过自适应速率限制策略来确定这一点,在该策略中,我们将从低速率限制开始,并使用爬山算法来确定最佳速率。我们可以跟踪第一个字节的时间(TTFB)和带宽速度等指标,以确定我们已经开始给上游服务器带来压力的确切时刻。然而,这里有很多缺点:

假设性能会稳步下降,而不是一下子全部失败,这可能是不正确的。

我们无法检测我们是导致性能问题的原因,还是主机只是由于配置错误或高流量而出现服务器故障。由于流量的正常波动,我们可能会陷入次优的速率限制。

在Python中记录TTFB很困难,因为它需要对连接数据的低级别访问。我们可能需要为aiohttp编写一个扩展才能获得它。

最终我觉得这太麻烦了。我们能用更简单的策略完成这项工作吗?

事实证明,网站的大小通常与基础设施能力相关。这背后的原因是,如果您能够托管450 mm的图像,那么您可能每秒至少能够处理几百个服务流量的请求。在我们的例子中,我们已经知道一个源有多少个图像,所以我们很容易将我们的速率限制固定在小网站的低最小值和大网站的合理最大值之间,然后插入介于两者之间的所有内容的速率限制。

当然,重要的是要注意到,这只是一个粗略的启发式方法,我们用它来合理地猜测一个网站可以处理什么。我们必须承认,尽管我们采取了预防措施,但我们设定的利率限制可能过于激进。

如果我们的启发式方法不能正确地近似某个站点的带宽能力,我们将开始遇到问题。首先,我们可能会超过服务器端的速率限制,这意味着我们将看到超过429个速率限制和403个禁止的错误,而不是我们正在尝试爬行的图像。更糟糕的是,上游源可能会继续愉快地为请求提供服务,而我们会吸收他们所有的流量容量,从而导致其他用户无法查看图像。显然,在任何一种情况下,如果我们似乎影响了他们的正常运行时间,我们都需要降低爬取率,甚至完全放弃爬取源。

为了处理这些情况,我们的工具箱中有两个工具:一个滑动窗口,记录我们在过去60秒内向每个域发出的每个请求的状态代码,以及每个网站最近50个状态的列表。如果我们的一分钟窗口中的错误数超过10%,则说明有问题;我们应该等待一分钟再重试。但是,如果我们连续遇到许多错误,这表明我们在某个特定站点上遇到了问题,因此我们应该放弃爬行源代码,并发出警报。

工作人员可以在Redis中的排序集中跟踪此信息。对于滑动错误窗口,我们将按其时间戳对每个请求进行排序,这将使我们可以轻松且廉价地使状态码在滑动窗口间隔之后过期。维护最后N个响应代码的列表甚至更容易;我们只需将状态代码放在与源关联的列表中。

类统计管理器:def__init__(self,redis):self。Redis=redis self。KNOWN_SOURCES=set()@staticmethod Async def_Record_Window_Samples(PIPE,SOURCE,STATUS):";";";将状态插入所有滑动窗口。";";";NOW=时间。Monotonic()#基于时间的滑动窗口,窗口对中的STAT_KEY,间隔:KEY=f';{STAT_KEY}{source}';等待管道。Zadd(Key,Now,f';{状态}:{时间。Monotonic()}';)#从窗口等待管道外部删除事件。Zem rangebycore(key,';-inf';,now-interval)#";最近n个请求等待管道窗口。Rush(f';{last_50_request}{source}';,status)等待管道。Ltrim(f';{last_50_request}{source}';,-50,-1)。

同时,爬行监视进程可以跟踪每个错误阈值的内容。

当在最后一分钟内向源发出的请求超过10%是错误时,我们将在Redis中设置停止条件,并停止补充速率限制令牌(更多信息如下所述)。

现在=时间。Monotonic()One_Minint_Window=等待Redis。Zrangebycore(One_Minint_Window_Key,';-inf';,Now-60)One_Minin_Window中的状态错误=0成功=0:如果状态不在预期状态:错误+=1否则:成功+=1如果不成功或错误/成功>;容差:等待编辑。Sadd(TEMP_HALTED_SET,源)。

为了检测严重错误(我们已经连续看到50个失败的请求),我们将设置永久停止条件。必须有人手动排除故障,然后重新打开该源的Crawler。

LAST_50_STATUS_KEY=f';statuslast50req:{source}';LAST_50_STATUS=等待编辑。Lrange(LAST_50_STATUS_KEY,0,-1)如果len(LAST_50_STATUS)>;=50且_EVERY_REQUEST_FAILED(LAST_50_STATUS):等待redis。Sadd(HALTED_SET,SOURCE)。

在实践中,保持跟踪误差阈值的滑动窗口并设置合理的最小和最大爬行率已经足够好,以至于断路器从未被跳闸。

制定爬行策略是一回事;实际执行则完全是另一回事。我们如何协调我们的多个爬行进程,以防止它们超出我们的速率限制?

答案是实现分布式令牌桶系统。这背后的想法是,每个爬虫程序在发出请求之前都必须从Redis获得一个令牌。爬网监视器每秒设置一个变量,其中包含可以对源发出的请求数。每个Crawler进程在发出请求之前都会递减计数器。如果递减的结果大于零,则清除Worker进行爬行。否则,已经达到速率限制,我们应该等待,直到获得令牌。

令牌桶的优点在于它们的简单性、性能和抗故障能力。如果我们的Crawler监控进程终止,则爬行将完全停止;如果不先获取令牌,则无法发出请求。与随着爬行监视器完全消失并允许无限制爬行相比,这是一个更好的替代方案。此外,由于递减计数器和检索结果在Redis中是原子操作,因此不存在竞争条件的风险,因此不需要锁定。这对性能是一个巨大的好处,因为协调和阻塞每个请求的开销会迅速使我们的爬行系统陷入困境。

为了确保所有爬行都以正确的速度执行,我用限速版本的类包装了aiohttp.ClientSession。

类RateLimitedClientSession:def__init__(self,aioclient,redis):self。客户=客户自己。Redis=redis异步def_get_Token(SELF,SOURCE):TOKEN_KEY=f';{CURRTOKEN_PREFIX}{source}';TOKENS=INT(等待SELF。雷迪斯。DECR(TOKEN_KEY))如果令牌&>=0:Token_Acquired=True,否则:#个令牌等待异步。睡眠(1)TOKEN_ACCEPTED=FALSE返回TOKEN_ACCENTED异步定义GET(self,url,source):TOKEN_ACCEPTED=FALSE WITH NOT TOKEN_ACCENTED:TOKEN_ACCEPTED=等待自身。_GET_TOKEN(源)返回等待自身。客户。Get(Url)。

我们爬虫程序设计中的最后一个问题是,我们希望以规定的速率限制同时抓取每个单独的网站。这听起来几乎是同义反复的,就像在实施了所有这些防止爬虫工作过快的逻辑之后,我们应该能够理所当然地认为是什么,但事实证明,我们的爬虫的处理能力本身就是一个有限的、有争议的资源。我们只能在每个员工身上同时调度这么多任务,并且我们需要确保来自单个网站的任务不会耗尽其他来源的爬网容量。

例如,假设每个工作者能够同时处理5000个爬行任务,并且每个任务都绑定到一个非常低速率限制的小网站。这意味着我们的整个工作进程(能够每秒处理数百个爬网和分析作业)每秒只能发出一个请求,直到队列中出现一些速度更快的任务。

换句话说,我们需要确保每个工作进程不会被单个源阻塞。我们有一个日程安排问题。我们天真地实行了先到先得,需要换一种不同的调度策略。

有无数种方法可以解决日程安排问题。由于我们的系统中只有几十个源,我们可以使用一个愚蠢的调度算法:在每个工人中为每个源提供相等的容量。换句话说,如果要分配5000个任务和30个源,我们可以为每个工人分配166个同时任务给每个源。对于我们的目的来说,这已经够多了。这种方法有明显的缺点,因为最终会有如此多的来源,以至于我们开始饥饿,高速率限制了工作来源。当我们走到那座桥的时候,我们会跨过这座桥;最好是使用我们能逃脱惩罚的最简单的方法,而不是把我们所有的时间都花在解决假想的未来问题上。

Async def_Schedule(SELF,TASK_SCHEDUE):RAW_SOURCES=等待自身。雷迪斯。SMembers(';入站_源&)来源。

.