Julia中的数据并行性简介

2020-10-07 15:01:29

如果您有大量数据,并且必须对每个元素进行类似的计算,则数据并行是使用多个CPU、机器以及GPU加速计算的一种简单方法。虽然这不是唯一的并行类型,但它涵盖了大量的计算密集型程序。使用数据并行性的一个主要障碍是,您需要摒弃一些在顺序计算中有用的习惯(即,模式会导致数据结构的突变)。特别是,使用帮助您描述要计算什么而不是如何计算的库是很重要的。实际上,它意味着使用映射和归约运算的广义形式,并学习如何用它们来表示您的计算。幸运的是,如果您已经知道如何编写迭代器理解,那么访问一大类数据并行计算就没有什么需要学习的了。

如果你想了解数据并行计算的高级概念,小盖伊·L·斯蒂尔(Guy L.Steele Jr.)的InfoQ Talk How to Think for Parallel Programming:不!是一个很棒的介绍(有很多有趣的离题评论)。他的Google TechTalk对一个微不足道的问题的四个解决方案也对进入数据并行思维模式非常有帮助。

本文主要介绍我(Takafumi Arakaki@TKF)开发的Julia包。因此,它目前专注于基于线程的并行。有简单的分布式计算支持。GPU支持是一个经常被要求的功能,但它还没有实现。另请参阅Julia中的其他并行计算库。

还要注意,本简介没有讨论如何使用Threads.@spawn等线程原语,因为它级别太低且容易出错。对于数据并行性,更高级别的描述要合适得多。它还可以帮助您编写更多可重用的代码;例如,将相同的代码用于单线程、多线程和分布式计算。

这里的大多数示例可能适用于所有Julia 1.x版本。但是,为了获得最佳效果,强烈建议您获取最新发布的版本(撰写本文时为1.5.2)。您可以在https://julialang.org/.下载。

获取Julia后,您可以使用Julia REPL中的PKG;Pkg.add([";Transducers";,";ThreadsX";,";OnlineStats";,";FLoops";,";MicroColltions";,";BangBang";,";])运行PKG;Pkg.add([";Transducers";,";ThreadsX";,";,";FLoops";,";])来获取本教程所需的依存关系。

如果您希望使用与测试本教程完全相同的环境,请运行以下命令。

要在Julia中使用多线程,需要用多个执行线程启动它。如果您使用的是Julia 1.5或更高版本,则可以使用-t auto(或等效的--threadsauto)选项启动它:

$julia-t auto_(_)_|文档:https://docs.julialang.org(_)|(_)(_)|_||_|键入";?";获取帮助,键入";]?";获取PKG帮助。|/_`|1.5.2(2020-09-23)_/|\__';_|_||\__';_||https://julialang.org/官方发布|__/|julia>;threads.nthread()#您拥有的核数8。

命令行选项-t/--thread还可以获取要使用的线程数。在较早的Julia版本中,使用JUIA_NUM_THREADS环境变量。例如,在Linux和MacOS上,Julia_NUM_THREADS=4Julia使用4个执行线程启动Juila。

下面的几个示例提到了基于Distributed.jl的并行性。就像设置多线程一样,您需要设置多个工作进程才能获得加速。您可以使用-p auto启动julia(或者,等效地,使用--procs auto)。Distributed.jl还允许您在使用addprocs启动Julia之后添加工作进程:

映射可能是数据并行中最常用的功能。回想一下朱莉娅的顺序地图是如何工作的:

作为一个稍微实用一点的示例,让我们来玩Collatz猜想,它说明了Collatz函数定义为。

我将跳过它的数学背景(因为我对它了解不多),但让我提一下,YouTube上有很多有趣的解释:)。

如果猜想是正确的,则初值所需的迭代次数是有限的。在Julia中,我们可以用以下公式计算。

函数collatz_stop_time(X)n=0,而true x==1&;&;return n n+=1 x=Collatz(X)End End

为了好玩,让我们画出初始值的停止时间从1到10,000:

使用Plotplt=散布(map(Collatz_Stopping_Time,1:10_000),xlabel=";初始值";,ylabel=";停止时间";,label=";";,markercolor=1,markerstrokecolor=1,markersize=3,size=(450,300),)。

Threads.nthread()#我使用BenchmarkTools map(collatz_stop_time,1:100000)用`-t 4`4启动了`julia`;18.116毫秒(2次分配:781.33 KiB)ThreadsX.map(collatz_stop_time,1:100_000);5.391毫秒(1665分配:7.09MiB)。

Julia的迭代器理解语法是合成映射、过滤和展平的强大工具。回想一下,映射可以编写为数组或迭代器理解:

B1=map(x->;x+1,1:3)b2=[x+1 for x in 1:3]#数组理解b3=Collect(x+1 for x in 1:3)#迭代器理解b1==b2==b3b1。

请注意,映射、过滤和展平的更复杂组合也可以并行执行:

使用传感器sc2=dCollect(如果1:x中的y为奇数(X),则1:x中的y为y)c1==c2。

SUM、PROD、MAXIMUM和ALL等函数都是可以并行化的缩减(又名折叠)的示例。当与迭代器理解结合使用时,它们是非常强大的工具。使用ThreadsX.jl,由理解语法创建的迭代器的总和

有关预定义缩减和其他并行化函数的完整列表,请键入ThreadsX。然后在REPL中按Tab键。

在给定的初值范围内,可以用最大值来计算Collatz函数的最大停止时间。

最大值(COLLATZ_STOP_TIME,1:100_000)17.625毫秒(0分配:0字节)350ThreadsX.max(COLLATZ_STOP_TIME,1:100_000)5.024毫秒(1214年分配:69.17KiB)350。

Jl提供了一组非常丰富且可组合的缩减。您可以将其作为第一个参数传递给ThreadsX.duce:

虽然当所有中间数据都可以放入内存时,OnlineStats.jl通常不提供计算给定统计数据的最快方法,但在许多情况下,您并不真正需要绝对最佳的性能。但是,如果ThreadsX.jl+OnlineStats.jl成为瓶颈,则可能值得考虑使用其他方法来计算统计数据。

对于非平凡的并行计算,您需要编写自定义约简。FLoops.jl为编写自定义缩减提供了一组简明的语法。例如,以下是如何在一次扫描中计算两个量的总和的方法:

对zip(1:3,1:2:6)中的(x,y)使用FLoops(1:3,1:2:6)a=x+y b=x-y(s+=a,t+=b)end(s,t)。

在本例中,我们没有初始化s和t;但它不是一个拼写错误。在并行求和中,像s和t这样的累加器的初始状态的唯一合理值是零。因此,@Reduce(s+=a,t+=b)的工作方式就像s和t被初始化为适当的零类型一样。但是,由于julia(0::int,0.0::Float64,(0x00+0x00im)::Complex{UInt8},...)中有许多零,如果输入集合(即,Xs的forx中的值Xs为空),s和t是未定义的。

要控制累加器的类型并避免在空情况下出现UndefVarError,可以使用ACKUCTOR=INITIAL_VALUE OP INPUT语法设置初始值。

对于(x,y)in zip(1:3,1:2:6)a=x+yb=x-y(s2=0.0+a,t2=0im+b)end(s2,t2)。

要理解@Floop with@Reduce(累加器=Initial_Value op input)语法的计算,只需忽略@Reduce(以及相应的s和)就可以大致了解。更具体地说,是:

从累加器=INITIAL_VALUE OP输入中提取表达式CONTERATOR=INITIAL_VALUE(";Initializers";),并将它们放在for循环的前面。

设s2=0.0#initializer t2=0 im#zip(1:3,1:2:6)中(x,y)的初始值设定项a=x+yb=x-y s2=s2+a#从`@Reduce`t2=t2+b#中的`s2=0.0+a`转换而来,`@Reduce`end#\(s2,t2)#`中的`t2=0im+b`-第一个参数现在与左端相同。

简写形式@Reduce(s+=a,t+=b)是通过使用输入集合的第一个元素作为初始值来实现的。

此转换用于生成在单个任务中执行的基本用例。任务的多个结果由@Reduce指定的运算符和函数组合。更明确地说,(S2_RIGHT,T2_RIGHT)通过以下方式组合成(S2_LEFT,T2_LEFT。

如果使用得当,锁和原子可以帮助您编写正确的并发程序。但是,它们通过限制并行执行来做到这一点。使用数据并行模式是获得高性能的最简单方式。

@Reduce()DO语法是FLoops.jl中表达自定义缩减的最灵活方式。当一个变量可以影响减法中的其他变量(例如,下例中的索引和值)时,它非常有用。还要注意,@Reduce可以在循环体中多次使用。以下是并行计算findmin和findmax的方法:

对于(i,x)成对([0,1,3,2])()DO(伊明=-1;i),(xmin=inf;x)如果xmin>;xxmin=x伊明=i End End()Do(imax=-1;i),(xmax=-inf;x)如果xmax;lt;xxmax=ximax=i结束伊明xmin imax xmax。

通过忽略带有@Reduce()DO的行和相应的结尾,我们可以大致理解@Floop的计算。更具体地说,是:

从(累加器=初始_值;输入)或(累加器;输入)中提取表达式CONTERATOR=INITIAL_VALUE(";Initializers";),并将它们放在for循环前面。

设imin2=-1#-+xmin2=inf#|初始化式imax2=-1#|xmax2=-inf#-+for(i,x)成对([0,1,3,2]),如果xmin2>;x#-+xmin2=x#|do block body imin2=i#|end#|if xmax2<;x#|xmax2=x#|imax2=i#|end#-+end imin2 xmin2 imax2 xmax2 end。

上面的计算用于输入集合的每个分区,并由@Reduce()do块定义的Reduce函数组合。也就是说,(imin2_right、xmin2_right、imax2_right、xmax2_right)通过以下方式组合成(imin2_Left、xmin2_Left、imax2_Left、xmax2_Left。

如果XMIN_LEFT&gT;XMIN_RIGHT XMIN_LEFT=XMIN_RIGHT伊明_LEFT=伊明_RIGHT END如果XMAX_LEFT<;XMAX_RIGHT XMAX_LEFT=XMAX_RIGHT IMAX_LEFT=IMAX_RIGHT END。

备注:注意到第一个@Reduce()DO块的x和i被替换为XMIN_RIGHT和伊明_RIGHT,而第二个@Reduce()DO块的x和i被替换为XMAX_RIGHT和IMAX_RIGHT。这就是我们使用两个@Reduce()do块的原因;我们需要将";x/i与xmin/伊明或xmax/imax配对,具体取决于我们所在的IF块。

请注意,编写自定义缩减不一定要使用@Floop。例如,您可以使用ThreadsX.duce编写等效代码:

(imin3,xmin3,imax3,xmax3)=ThreadsX.duce(i,x,i,x)for(i,x)成对([0,1,3,2]));init=(-1,inf,-1,-inf))do(伊明,xmin,imax,xmax),(i1,x1,i2,x2)if xmin>;x1xmin=x1伊明=i1End if xmax<;X2 xmax=x2 imax=i2 End Return(伊明,xmin,imax,xmax)结束(imin3,xmin3,imax3,xmax3)==(伊明,xmin,imax,xmax)。

但是,如您所见,它更加冗长且容易出错(例如,初始值和变量声明在不同的位置)。

MapReduce和Reduce在组合预先存在的操作时非常有用。例如,我们可以通过组合MapReduce、dict和mergewith!:

包含11个条目的字典{Char,Int64}:';f';=>;5';d';=>;6';e';=>;5';j';=>;4';h';=>;5';i';=>;3';k';=>;4';a';=>;2';c';=>;6';g';=>;6';b';=>;4。

注意,这段代码有一个性能问题:dict(x=>;1)为每个迭代分配一个对象。这在线程化的Julia代码中尤其糟糕,因为它频繁调用垃圾回收。为了避免这种情况,我们可以用MicroCollections.SingletonDict替换DICT,它不在堆中分配字典。通过调用Bang.mergewith!!,可以将SingletonDict升级为字典。然后,它将为每个要变异的任务创建一个可变对象。然后,我们可以组成高效的并行直方图操作:

使用BangBang:Mergewith!!使用微集合:SingletonDictf2=ThreadsX.mapduce(x->;SingletonDict(x=>;1),mergewith!(+),str)f1==f2。

让计算某个初始值范围内的Collatz_Stopping_Time直方图。与上面的直方图示例不同,我们知道停止时间是一个正整数。因此,使用数组作为将bin(索引)映射到计数的数据结构是有意义的。没有像mergewith这样的预定义还原函数!我们可以利用。幸运的是,在@flop中使用@Reduce()DO语法可以很容易地编写它:

使用微集合使用FLoop:SingletonDictmaxkey(xs::AbstractVector)=lastindex(Xs)maxkey(xs::SingletonDict)=first(key(Xs))函数Collatz_Columgraph(Xs,Executor=ThreadedEx())x在Xs中的执行器n=Collatz_Stopping_Time(X)n>;0|Continue obs=SingletonDict(n=>;1)()do(hist=Int[];obs)l=length(Hist)m=maxkey(Obs)#obs是向量或SingletonDict(n=>;1)()do(hist=Int[];obs)l=length(Hist)m=maxkey(Obs)#obs是向量或SingletonDict(n=>;1)()do(hist=Int[];obs)l=length(Hist)m=maxkey(Obs)。M#拉伸`hist`,以便将合并结果放入其中。RESIZE!(HIST,m)Fill!(view(HIST,l+1:m),0)end#将`obs`合并为`hist`:对于(k,v)成对(Obs)hist[k]+=v end返回hist end。

如上所述,@Reduce()DO块在两个上下文中使用;对于顺序基本情况,以及用于组合来自两个基本情况的累积结果。因此,为了组合HIST_LEFT和HIST_RIGHT,我们需要将HIST_RIGHT替换为OBS。这就是为什么我们需要处理OBS是SingletonDict和Vector的情况的原因。多亏了多次发货,很容易吸收两个集装箱的差额。我们可以只使用Base为Pair定义的内容,并且只需要定义maxkey来吸收剩余的差异。

当编写@Reduce()DO(L₁=I₁;R₁),(L₂=I₂;R₂),...,(Lₙ=Iₙ;Rₙ)时,请确保DO块体可以处理替换为Rᵢ的Lᵢ的任意可能值,而不仅仅是在For循环体中直接计算的Rᵢ。

使用Plotplt=Plot(Collatz_Columgraph(1:1_000_000),xLabel=";停止时间";,yLabel=";计数";,Label=";";,Size=(450,300),)。

我们用@Floop Executor来...。语法,以便在不同类型的执行机制之间轻松切换;即,顺序执行、线程执行和分布式执行:

HIST1=COLLATZ_DISTGRAM(1:1_000_000,SequentialEx())HIST2=COLLATZ_HISTGRAM(1:1_000_000,ThreadedEx())HIST3=COLLATZ_HISTGRAM(1:1_000_000,DistributedEx())HIST1==HIST2==HIST3。

COLLATZ_STUSTGRAM(1:1_000_000,SequentialEx());220.022毫秒(9个分配:13.81KiB)COLLATZ_STUSTGRAM(1:1_000_000,ThreadedEx());58.271毫秒(155个分配:60.81KiB)。

Julia本身有Threads.@Threads宏表示Threaded for loop,@Distributed宏表示Distributed For循环。它们对于简单的用例是足够的,但也有一些限制。例如,@THREADS没有内置的缩减函数支持。虽然@Distributed宏支持缩减函数,但它仅限于预定义函数,并且处理多个变量非常繁琐。这两个宏都只有简单的静态调度器,并且缺少像FLoops.jl和ThreadsX.jl支持的basesize这样的选项来调优负载平衡。此外,用@线程编写的代码不能在@Distributed中重用,反之亦然。

希望本教程仅涵盖开始编写数据并行程序的最低要求,并且FLoops.jl和ThreadsX.jl的文档现在更易于访问。这两个库基于为Transducers.jl设计的协议,该协议还包含用于数据并行的各种工具。

Transducers.jl;的并行处理教程介绍了类似的主题,并对更多低级细节进行了解释。基于Guy L.Steele Jr.的2009 ICFP Talk的并行字数计算教程更高级,但我发现它是一个很好的例子,可以用来理解巧妙设计的缩减功能可以实现的功能。

请注意,本教程中提出的想法非常笼统,在使用其他库时也应该适用。例如,在CuArray上使用MapReduce时,自定义缩减的想法在GPU计算中很有用。

©荒木高文。上次修改日期:2020年10月6日。用Franklin.jl和Julia编程语言构建的网站。