多核OCaml中的并行编程

2020-07-06 03:23:58

本教程将帮助您开始使用Multicore OCaml编写并行程序。所有代码示例及其对应的沙丘文件都可以在code/目录中找到。本教程分为以下几个部分:

多核OCaml是OCaml的扩展,本机支持通过域实现的共享内存并行性和通过代数效果实现的并发性。它正在缓慢但稳定地合并到主干OCaml。域-只有多核才会首先登陆,然后是代数效应(Algebraic Effects)。

多核OCaml编译器附带了垃圾收集器的两种变体,即并发次要收集器(ConcMinor)和停止世界的并行次要收集器(ParMinor)。我们的实验表明,在大多数情况下,ParMinor比ConcMinor执行得更好。与ConcMinor破坏C API不同,ParMinor也不需要对编译器的C API进行任何更改。因此,共识是在仅限域的多核向上传输期间继续使用ParMinor。ConcMinor的版本是OCaml 4.06.1,ParMinor已经升级到4.10.0和4.11.0。本文提供了更多关于GC设计和评估的详细信息。

本教程将带您了解如何在多核OCaml中有利可图地编写并行程序。这里没有涉及到效果处理器的故事,对于感兴趣的人,请务必查看本教程和示例。

虽然将多核位向上传输到主干的工作仍在进行中,但可以在多核Opam的帮助下开始使用多核OCaml。Multicore OCaml 4.10.0编译器和domainlib的安装说明可以在这里找到,其他可用的编译器变体在这里。

在您的多核交换机上安装utop也很有用。opam install utop应该可以开箱即用。

设n=n*n令x=5令y=10令_=令d=Domain.spawn(Fun_-&>;Square x)在令Sx=Domain.Join d in Printf.printf";x=%d,y=%d\n";Sx Sy中设Sy=Square y。

域。加入%d个块,直到域%d运行完成。如果域在执行后返回结果,Domain.Join d也会返回该值。如果它引发未捕获的异常,则抛出该异常。当父域中断时,所有其他域也会终止。为了确保域运行到完成,我们必须加入该域。

请注意,x的平方是在新域中计算的,y的平方是在父域中计算的。

确保使用多核交换机构建此示例和本教程中遇到的所有其他后续示例。

这些错误通常意味着用于编译代码的开关不是amulticore开关。使用多核交换机应该可以解决这些问题。

Domainslb是一个用于多核OCaml的并行编程库。它提供了以下API,使得只需对顺序代码进行少量修改即可轻松并行化OCaml代码:

在域部分,我们了解了如何通过派生新域在多核上运行程序。如果我们使用这种方法并行执行代码,我们经常会发现自己在同一个程序中多次催生和加入新域。创建新域是一项代价高昂的操作,我们应该尽可能地加以限制。任务池允许我们在程序开始时产生的同一组域中执行所有并行工作负载。让我们看看如何让任务池正常工作。

注意:如果您在utop上运行此命令,请使用此之前的散列运行#Required";domainlib&34;。

我们已经创建了一个具有三个新域的新任务池。父域也是该池的一部分,因此它是一个由四个域组成的池。在池设置之后,我们可以使用这个池来执行我们想要并行运行的所有任务。setup_pool函数要求我们指定要在任务池中派生的新域的数量。启动任务池的理想域数等于可用核心数。由于父域也是池的一部分,因此num_domain参数应该比可用核心数少一。

强烈建议您在执行所有任务后关闭任务池,尽管这并不是绝对必要的。这可以按照如下方式完成

现在池已停用,不再可用,因此请确保仅在所有任务完成后才执行此操作。

PARALLEL_FOR是Task API中一个功能强大的原语,它可以很好地伸缩,只需对顺序代码进行很小的更改。

首先,让我们编写一个函数的顺序版本,该函数执行两个矩阵的矩阵乘法并返回结果。

设乘矩阵a b=设i_n=Array.length a,设j_n=Array.length b.(0)设k_n=Array.length b.(0)设res=Array.make_Matrix i_n j_n 0 in for i=0 to i_n-1 do for j=0 to j_n-1 do for k=0 to k_n-1 do res(I).(J)<;-决议(I)(J)+a(I)(K)*b(K)(J)已完成。

与MulticoreOCaml上下文中的列表相比,数组提供了更高的效率。尽管它们在函数式编程中通常不受欢迎,但为了提高效率而使用数组是一种合理的权衡。

设并行矩阵倍增池a b=设i_n=Array.length a,设j_n=Array.length b.(0),设k_n=Array.length b in Res=Array.make_Matrix i_n j_n 0 in Task.Parallel_for pool~Chunk_Size:Chunk_Size~start:0~Finish:(i_n-1)~body:(Fun i->;对于j=0到j_n-1,Do for k=0到k_n-1做响应(I).(J)<;-解析(I).(J)+a.(I).(K)*b.(K).(J)完成);解析。

我们可以观察到平行译本和顺序译本之间的许多不同之处。并行版本采用一个额外的参数池,这是因为PARALLEL_FOR在该任务池中存在的域上执行for循环。虽然可以在函数本身内部初始化任务池,但在整个程序中使用单个任务池总是更好的。如前所述,这是为了将创建新域名所涉及的成本降至最低。也可以创建一个全局任务池并跨整个使用它,但是为了能够更好地对您的代码进行推理,建议将其用作函数参数。

我们将检查PARALLEL_FOR的参数。正如前面所讨论的,它接受池中的start和Finish,因为名称suggest set是循环迭代的起始值和结束值,body包含要执行的实际循环体。顺序版本中不存在的一个参数是CHUNK_SIZE。块大小决定了在多核上执行任务时的粒度。理想的块大小取决于以下因素的组合:

循环的性质:在决定要使用的CHUNK_SIZE时,需要考虑与TheLoop有关的两件事,一是TheLoop中的迭代次数,二是每次迭代所需的时间。如果每次迭代花费的时间大致相等,那么CHUNK_SIZE可以是迭代次数除以核心数量。另一方面,如果每次迭代花费的时间不同,那么块应该更小。如果总迭代次数是一个相当大的数字,像32或16这样的CHUNK_SIZE是可以安全使用的,而如果迭代次数很低,比如10,则CHUNK_SIZE为1会执行得最好。

机器:不同机器的最佳块大小各不相同,建议尝试一系列值,以找出在您的机器上效果最好的值。

在并行FOR的帮助下,我们已经实现了16倍的加速。当可并行工作负载可用时,很有可能实现线性加速。

请注意,并行代码的性能在很大程度上取决于机器,这里描述了一些特定于Linux系统的机器设置,以获得最佳结果。

PARALLEL_FOR有一个隐式的障碍,这意味着在同一池中等待执行的其他任务(如果有的话)只有在完成PARALLEL_FOR中的所有块之后才会启动。因此,我们不必担心在两个PARALLEL_FOR或PARALLEL_FOR之后的其他操作之间显式地创建和插入障碍。考虑这个场景:我们有三个矩阵m1、m2和m3。我们要计算(m1*m2)*m3,其中*表示矩阵乘法。为简单起见,我们假设这三个矩阵都是大小相同的方阵。

让Parallel_Matrix_Multiply_3池m1 m2 m3=let size=Array.length m1 in(*存储M1*m2*)let res=Array.make_Matrix size size 0 in(*store M1*m2*)let res=Array.make_Matrix size size 0 in Task.parallel_for pool~chunk_size:(size/num_domain)~start:0~Finish:(size-1)~body:(Fun i->;对于j=0到size-1,当k=0到size-1时,执行以下操作。-t.(I).(J)+m1.(I).(K)*m2.(K).(J)完成);Task.Parallel_for pool~Chunk_Size:(Size/Num_Domain)~Start:0~Finish:(Size-1)~Body:(Fun i-&>Do for k=0 to Size-1 Do for k=0 to Size-1 Do For k=0 to Size-1 Do)(I).(J)<;-决议(I)(J)+t(I)(K)*m3(K)(J)已完成);决议(I)+t(I).(K)*m3(K).(J)已完成);

在假设PARALLEL_FOR没有隐式障碍的情况下,在上面的示例中,ress的计算很可能不正确。因为我们已经有了一个隐式的障碍,所以我们会得到正确的计算。

像上面这样的顺序for循环,从开始到结束,以完全相同的顺序运行迭代。在PARALLEL_FOR的情况下,执行顺序是任意的,并且在完全相同的代码的两次运行之间变化。如果迭代顺序对于代码按预期工作很重要,建议谨慎使用PARALLEL_FOR。

如果循环中有任何依赖关系,比如当前迭代依赖于前一次迭代的结果,那么如果使用PARALLEL_FOR,代码的正确性很有可能不再成立。任务API有一个原语PARALLEL_SCAN,在这样的场景中可能会派上用场。

Parallel For让我们可以轻松地并行化迭代任务。异步等待为并行执行任务提供了更多的灵活性,这在递归功能中特别有用。我们在前面已经了解了如何设置和拆卸任务池。Task API还具有在任务池上运行特定任务的功能。

我们将并行计算斐波纳契数。首先,让我们编写一个顺序函数来计算斐波纳契数。这是一个没有尾递归的朴素斐波纳契函数。

设rec fib n=if n<;2则1否则fib(n-1)+fib(n-2)。

注意到递归情况下的fib(n-1)和fib(n-2)中的两个操作没有任何相互依赖关系,这便于我们并行计算它们。实质上,我们可以并行计算fib(n-1)和fib(n-2),然后将结果相加得到答案。

我们可以通过产生一个新的域来执行计算,并将其连接以获得结果,从而实现这一点。我们在这里必须小心,不要产生比可用核心数量更多的域。

让rec fib_par n d=if d<;=1,则fib n否则让a=fib_par(n-1)(d-1)in b=Domain.spawn(Fun_->;fib_par(n-2)(d-1))in a+Domain.Join b。

我们还可以使用任务池异步执行任务,这样不那么繁琐,伸缩性也更好。

让rec fib_par pool n=如果n=40,则fib n否则让a=Task.async pool(Fun_->;fib_par pool(n-1)),let b=Task.async pool(Fun_-&>;fib_par pool(n-2))in Task.awast池a+Task.await池b。

如果n-lt;=40,则当输入小于40时,我们运行顺序fib函数。当输入的数字足够小时,最好按顺序执行计算。这里我们取40作为阈值,一些实验可以帮助您找到一个足够好的阈值,低于这个阈值就可以按顺序进行计算。

Task.async异步执行池中的任务,并返回一个承诺,这是一个尚未完成的计算。在执行运行到完成后,其结果将存储在承诺中。

Task.await等待承诺完成其执行,一旦执行完成,就会返回任务的结果。如果任务引发未捕获的异常,AWAIT也会引发相同的异常。

通道充当域之间数据通信的媒介。它们可以在多个发送域和接收域之间共享。多核OCamlcome中的频道有两种风格:

有界:具有固定大小的缓冲通道。缓冲器大小为0的信道对应于同步信道,缓冲器大小为1的信道给出MVarStructure。可以创建具有任意缓冲区大小的有界通道。

无界:无界通道对它们可以容纳的对象数量没有限制,它们只受内存可用性的限制。

在let msg=Chan.recv c in Domain.join send;Printf.printf";消息:%s\n";msg中打开Domainlib let c=Chan.make_bound 0 let_=let send=Domain.spawn(Fun_->;Chan.send c";hello";)

在上面的示例中,我们有一个大小为0的有界通道c。任何到该信道的发送都被阻止,直到遇到相应的RECV。因此,如果在本例中删除recv,程序将无限期阻塞。

在Domain.Join Send;中打开Domainlib let c=Chan.make_bound 0 let_=let send=Domain.spawn(Fun_->;Chan.send c";hello";)。

上面的例子基本上是无限期阻塞的,因为发送器没有对应的接收。如果我们改为创建缓冲区大小为n的有界通道,它可以在通道中最多存储[n]个对象,而不需要相应的接收,超过该值将阻止发送。我们可以使用与上面相同的示例进行尝试,只需将缓冲区大小更改为1即可。

在Domain.Join Send;中打开Domainlib let c=Chan.make_bound 1let_=let send=Domain.spawn(Fun_->;Chan.send c";hello";)。

如果您不想阻塞send或recv,那么send_poll和recv_polt可能会派上用场。它们返回一个布尔值,如果操作成功,则返回TRUE,否则返回FALSE。

在Domain.Join Send;中打开Domainlib let c=Chan.make_bound 0let_=let send=Domain.spawn(Fun_->;let b=Chan.send_polc";hello";in Printf.printf";%B\n";b)。

由于缓冲区大小为0,并且通道不能容纳任何对象,因此此程序将打印FALSE,

打开Domainlib let Num_Domones=try int_of_string Sys.argv。(1)with_->;4 let c=Chan.make_bound Num_Domones let发送c=Printf.printf";发送方:%d\n";(Domain.self():>;int);Chan.send c";howdy!";let recv c=Printf.printf";接收方:%d\n";(Domain.self():>;int);Chan.send c";howdy!";let recv c=Printf.printf";接收。(Domain.self():>;int);Chan.recv c|>;忽略阵列中的let_=let sders=Array.init num_domain(Fun_->;Domain.spawn(Fun_->;send c))中的let Receivers=Array.init num_Domain(Fun_->;Domain.spawn(Fun_->;recv c))。Array.iter Domain.Join发送器中的let Receivers=Array.init num_Domain(Fun_->;Domain.spawn(Fun_->;recv c))

既然我们已经对渠道的工作方式有了一定的了解,让我们来考虑一个更现实的例子。我们将了解如何编写在多个域上执行任务的通用任务分配器。

模块C=Domainslb。chan let num_domain=try int_of_string Sys.argv(1)with_->;4 let n=try int_of_string Sys.argv(2)with_->;100类型';a|Quit let c=C.make_unbound()let create_work task=Array.iter(Fun t->;C.send c(Task T))Tasks;for_=1 to num_domain do C.发送c退出完成let rec worker f()=将C.recv c与|任务a->;f a;worker f()|Quit->;()let_=let Tasks=Array.init n(Fun i->;i)在create_work任务中匹配;let factorial n=let rec aux n acc=if(n>;0)则AUX(n-1)(acc*n)Else Acc in AUX n 1 in let Results=Array.make n 0 in let update r i=r.(I)<;-letDomones中的factorial i=Array.init(num_domain-1)(FUN_->;Domain.spawn(worker(Update Result)in Worker(Update Results)();Array.iter Domain.Join Domain.Array.iter(Printf.printf。

我们已经创建了一个无界通道c,它将充当所有任务的存储区。这里我们将关注两个函数:create_work和worker。

create_work接受一组任务,并将任务的所有元素推送到通道c。Worker函数从通道接收任务,并以接收到的任务为参数执行函数f。它不断递归,直到遇到退出消息,这就是我们向通道发送退出消息的原因,这表明工作进程可以终止。

通过在所有域上运行Worker函数,我们可以使用此模板在多个核心上运行任何任务。此示例运行一个简单的factorialfunction。任务的粒度也可以调整,例如,通过在Worker函数中更改它,Worker可以运行一系列任务,而不是单个任务。

在多核OCaml中编写并行程序时,经常会遇到开销,这可能会降低代码的性能。本节介绍发现并修复这些开销的方法。Linux性能和多核运行时中的事件日志对于性能调试特别有用。在本节中,我们将使用它们进行性能调试。让我们借助一个例子来做这件事。

使用标准库的Array.init进行数组初始化是按顺序进行的。程序中的并行工作负载将根据使用的核心数量进行扩展,而初始化在所有情况下都需要相同的时间。这可能会成为并行工作负载的瓶颈。

对于浮点数组,我们有Array.createFloat,它创建一个新的浮点数组。我们可以使用它来分配一个数组,并并行执行初始化。让我们并行地对具有随机数的浮点数组进行初始化。

这是一个简单的实现,它将用随机数初始化数组的所有元素。

打开Domainlib let num_domain=try int_of_string Sys.argv(1)with_->;4 let n=try int_of_string Sys.argv(2)with_->;(1)with_->;4 let n=try int_of_string Sys.argv.(2)with_->;100000设a=Array.create_Float n let_=let pool=Task.setup_pool~num_domain:(num_domain-1)in Task.parallel_for pool~chunk_size:(n/num_domain)~start:0~Finish:(n-1)~Body:(Fun i->;Array.set a i(随机.。浮动1000.));Task.teardown_pool池。

当我们原本期望在多个内核中看到加速执行时,我们在这里看到的却是代码随着内核数量的增加而变慢。代码有问题,看起来不对劲。

我们可以看到,Random Bits的开销高达87.99%。通常情况下,我们没有一个原因可以归因于这些管理费用,因为它们非常特定于程序。可能需要稍微仔细检查一下才能找出是什么原因造成的。在这种情况下,Random模块在所有域之间共享相同的状态,当多个域同时尝试访问它时,这会导致争用。

为了克服这一点,我们将为每个域使用不同的状态,这样就不会因为共享状态而引起争用。

模块T=Domainslb。任务let n=try int_of_string Sys.argv(2)with_->;1000 let num_domain=try int_of_string Sys.argv.(1)with_->;4 let arr=Array.create_Float n let_=letDomain=T.setup_pool~num_domain:(num_domain-1)in let state=Array.init num_domain(Fun_->;T.parallel_for domain中的Random.State.make_self_init()~chunk_size:(n/num_domain)~start:0~Finish:(n-1)~body:(Fun i->;let d=(Domain.self():>;int)mod num_domain in。

..