用更好的地图击败火花

2021-04-26 15:03:15

Warning: Can only detect less than 5000 characters

在计算结束时,我们有长时间的几乎串行代码。这是为什么?从该地图的计算图中可以看到问题,从而减少作业。

当我们下降计算时,我们将单词蒸馏成单词计数对。这是o(语料库尺寸)工作,可以是无限的并行化。但在减少步骤中,我们需要将许多小哈希图结合在一起以检索最终的HashMap。在每个级别,我们结合了一对HashMaps。在所有组合步骤中完成的总工作是O(NUM唯一单词),但要将掉落的HASHMAP对的数量与1.所以我们在结束时陷入困境,为单个工人组合最终地图。

我们能做些什么来解决这种情况?问题源于我们的最终输出 - 从单词计算的地图 - 远远大得多串行处理。因此,无论如何,诸如地图缩小的接口,返回诸如常规的单线程Hashmap的“串行对象”(例如常规,单线程的Hashmap)被注定要在某些时候要求在一个线程上处理大O(NUM唯一单词)。

当然,我们可以调查锁定或多线程的HashMaps,假设共享内存系统,但是每个Joe Hellerstein的“泳道”直觉,它是优选的,而是框架将本地保存到单个CPU的缓存中的框架与共享存储器方法不同,尽可能地(此偏好也适用于分布式计算)。

MAP减少依赖于以下抽象,它自然地导致单个输出“串行对象”\(u \)从类型的类型\(x \)的集合中:

\ [开始{align} \ mathrm {map}&:x \ lightarrow u \\\\\ mathrm {dreams}&amp ;: u \ lightarrow u \ lightarrow u \ negu {senge} \] \ [\ begin {对齐} \ mathrm {flatmap}&amp ;: x \ lightarrow [y] \\\\\ mathrm {fold}&:u \ lightarrow y \ lightarrow u \ neat {aligh} \]最终在Unix中非常自然设置,除了\(\ mathrm {flatmap},\ mathrm {flatmap},\ mathrm {fold} \)函数的函数没有要求。示意图,它看起来像这样:

注意,这需要实际更改我们的结果。我们不再提供钥匙的串行HashMap,而是对禁用HashMaps的禁令。由于脱节,这无需额外的合并。

这种方法的优势在地图上减少了键入的输入是(1)没有串行减少步骤,(2)所有计算步骤可以在线进行,并且(3)内存用法界定,与树木降低方法不同,在哪里原则上,所有键都可以在多个瞬态未更新的地图上复制。

我实现了上面的一个版本,用于类似UNIX的文本流接口。 SLB(对于“分叉负载余额”)基本上是平行的--PIPE --Roundrobin将基于哈希分割其输入,并维护并行独立的映射器进程和折叠进程在计算结束时发出线路。这里的分离步骤只是线切级联(其中WordCount的输出线是键值对)。

/ usr / bin / time -f"%e sec"目标/释放/单秒钟\ - 传播器' TR" " " \ n" | RG -V" ^ $"' \ --folder" awk' {a [\ $ 0] ++}结束{for(k在a)print k,a [k]}'" \ --infile enwik9.clean \ --outprefix wikslb。#6.20 seccat wikslb。* |排序--Parallel = $(nproc)-k2nr -k1 |头-5#7797642#4855049#和3059322#在2621192#a 2332364

好多了! Flatmap操作Tr" " " \ n" | RG -V" ^ $"它在自己的行中放置了每个单词,是天然的UNIX线流操作。文件夹,awk' {a [$ 0] ++}结束{for(k在a)print k,a [k]}'满是追踪一个简单的键控计数器。这使得SLB在UNIX方式中并行化键控的键合作,这方便ML使用情况,例如:

SLB有各种有趣的扩展;查看仓库以获取详细信息和示例。

P.S.如今,Spark有一个更先进的API,可以看出我们并行计算的完整AST:

pyspark ...>>>来自Pyspark.sql.functions Import Split,Col,爆炸>>> ctr = spark.read.text(' enwik9.clean')\ ... .select(爆炸(拆分('值'),' \ s +& #39;))。别名(' word'))\ ....在哪里(' word')!=#39;')\ .. 。.groupby(' word')\ ... .count()\ ... .collect()>>> ctr.sort(key = lambda r:r [" count"])>>> ctr [-5:] [行(word =' a' count = 2332364),行(word ='在',count = 2621192),行(word =&#39 ;和#39;,count = 3059322),行(Word ='',count = 4855049),行(word ='',count = 7797642)]

GroupBy部分现在在34秒内运行。 正如我们所看到的,我们也有更高的利用率: