Concurrencpp –一个C ++并发库

2021-01-02 08:21:55

concurrencpp是C ++的任务库,允许开发人员通过使用任务,执行程序和协程轻松安全地编写高度并发的应用程序。使用concurrencpp应用程序可以将需要异步处理的大型过程分解为可并行运行并可以在内部运行的较小任务。合作的方式来达到所需的结果。concurrencpp还允许应用程序使用并行协程轻松编写并行算法。

能够编写现代并发代码,而不必依赖诸如锁和条件变量之类的低级并发原语。

能够编写高度并发和并行的应用程序,这些应用程序可以根据需要自动扩展以使用所有硬件资源。

通过使用C ++ 20协程和co_await关键字,可以轻松地编写非阻塞,类似同步的代码。

通过使用具有内置同步功能的高级对象,减少争用条件,数据争用和死锁的可能性。

concurrencpp是一个以任务为中心的库。任务是异步操作。与传统的以线程为中心的方法相比,任务为并发代码提供了更高级别的抽象。任务可以链接在一起,这意味着任务将它们的异步结果从一个传递到另一个,其中一个任务的结果就好像它是另一个正在进行的任务的参数或中间值一样使用。与可以使用原始线程相比,任务可以使应用程序更好地利用可用的硬件资源,并且规模更大,因为任务可以挂起,等待另一个任务产生结果,而不会阻塞底层的OS线程。任务使开发人员可以将更多的精力放在业务逻辑上,而不必将其放在线程管理和线程间同步等低层概念上,从而为他们带来了更高的生产力。

任务指定必须执行哪些操作时,执行程序是指定执行任务的位置和方式的工作对象。执行人员可以自己为应用程序分配线程池和任务队列的管理权限。执行程序还通过提供用于创建和调度任务的统一API,将那些概念与应用程序代码分离开来。

任务使用结果对象相互通信。结果对象是一个异步管道,它将一个任务的异步结果传递给另一个正在进行的任务。可以以无阻碍的方式等待和解决结果。

这三个概念-任务,执行程序和相关结果是concurrencpp的基础。执行程序运行通过结果对象发送结果而彼此通信的任务。任务,执行程序和结果对象共生地协同工作,以生成快速,干净的并发代码。

concurrencpp是围绕RAII概念构建的。为了使用任务和执行程序,应用程序在主函数的开头创建一个运行时实例。然后,运行时用于获取现有执行程序并注册新的用户定义的执行程序。执行程序用于创建和调度要运行的任务,它们可能会返回结果对象,该结果对象可用于将异步结果封送给另一个充当其使用者的任务。运行时被破坏时,它将遍历每个存储的执行程序并调用其关闭方法。然后,每个执行者都会正常退出。计划外的任务被销毁,尝试创建新任务将引发异常。

#include" concurrencpp / concurrencpp.h"#include< iostream> int main(){concurrencpp :: runtime运行时;自动结果=运行时间。 thread_executor()->提交([] {std :: cout<<" hello world"<< std :: endl;});结果。得到();返回0;}

在这个基本示例中,我们创建了一个运行时对象,然后从运行时获取了线程执行程序。我们使用Submit传递lambda作为给定的可调用对象。该lambda返回void,因此,执行程序返回结果。将异步结果封送回调用方的对象。 main调用将阻塞主线程,直到结果准备就绪为止。如果没有抛出异常,则get返回void。如果抛出异常,请重新抛出该异常。异步地,thread_executor启动一个新的执行线程并运行给定的lambda。它隐式co_return void并且任务已完成。然后将main解锁。

#include" concurrencpp / concurrencpp.h"#include< iostream>#include< vector>#include< algorithm>#include< ctime>使用命名空间concurrencpp; std :: vector< int> make_random_vector(){std :: vector< int> vec(64 * 1' 024); std :: srand(std :: time(nullptr));对于(auto& i:vec){i = :: rand(); } return vec;} result< size_t> count_even(std :: shared_ptr< thread_pool_executor> tpe,const std :: vector< int& vector){const auto vecor_size = vector。尺寸(); const auto concurrency_level = tpe-> max_concurrency_level(); const auto chunk_size = vecor_size / concurrency_level; std :: vector< result< size_t>> chunk_count; for(auto i = 0; i< concurrency_level; i ++){const auto chunk_begin = i * chunk_size; const auto chunk_end =块开始+块大小;自动结果= tpe->提交([& vector,chunk_begin,chunk_end]()-> size_t {返回std :: count_if(vector。begin()+ chunk_begin,vector。begin()+ chunk_end,[](auto i){返回i% 2 == 0;});}); chunk_count。 emplace_back(std :: move(result)); } size_t total_count = 0; for(自动&结果:chunk_count){total_count + = co_await结果; } co_return total_count;} int main(){concurrencpp :: runtime运行时; const auto vector = make_random_vector();自动结果= count_even(运行时.thread_pool_executor(),向量); const auto total_count =结果。得到(); std :: cout<< "有" << total_count<< "向量"中的偶数<< std :: endl;返回0;}

在此示例中,我们通过创建运行时对象来启动程序。我们创建一个填充有随机数的向量,然后从运行时获取thread_pool_executor并调用count_even。 count_even是一个协程,它产生更多的任务并协同唤醒它们以在内部完成。 max_concurrency_level返回执行程序支持的最大工作程序数量,在线程池执行程序的情况下,工作程序数量是根据核心数量计算得出的,然后对数组进行分区以匹配工作程序数量,并在其数组中发送要处理的每个块自己完成任务。工作人员异步计算每个块包含多少个偶数,并共同返回结果。 count_even通过使用co_await拉计数来对每个结果求和,然后最终结果co_return。被调用get阻塞的主线程被取消阻塞并返回总计数。main打印偶数个数并且程序正常终止。

Warning: Can only detect less than 5000 characters

线程池执行程序-维护线程池的通用执行程序。线程池执行程序适用于不阻塞的短CPU绑定任务。鼓励应用程序将此执行程序用作非阻塞任务的默认执行程序。concurrencpp线程池提供动态线程注入和动态工作平衡。

阻塞执行程序-具有更大线程池的线程池执行程序。适用于启动简短的阻止任务,例如文件io和db查询。

线程执行程序-执行程序,它启动每个排队的任务以在新的执行线程上运行。线程不被重用。此执行程序对于长时间运行的任务(例如运行工作循环的对象或长时间阻塞的操作)很有用。

辅助线程执行程序-维护单个任务队列的单个线程执行程序。当应用程序需要专用线程来执行许多相关任务时,此方法非常适合。

手动执行程序-本身不执行协程的执行程序。应用程序代码可以通过手动调用其执行方法来执行先前排队的任务。

可派生执行程序-用户定义的执行程序的基类。尽管可以直接从concurrencpp :: executor继承,但是derivable_executor使用CRTP模式,该模式为编译器提供了一些优化机会。

内联执行程序-主要用于替代其他执行程序的行为。使任务入队等效于内联调用它。

执行程序的裸机制封装在其enqueue方法中,该方法将要执行的任务排队,并有两个重载:一个重载接收单个任务对象作为参数,另一个重载接收任务对象的范围,第二个重载是用于排队一批任务。这样可以更好地安排试探法并减少争用。

应用程序不必单独依赖排队,concurrencpp :: executor提供了一个API,用于通过将用户可调用对象转换为幕后任务对象来调度用户可调用对象。应用程序可以请求执行者返回一个结果对象,该结果对象对可调用对象的异步结果进行了编组。提供可调用的。这是通过调用executor :: submit和execuor :: bulk_submit完成的。提交获取可调用对象,并返回结果对象。 executor :: bulk_submit获取可调用对象的范围并以类似于提交工作的方式返回结果对象的向量。在许多情况下,应用程序对异步值或异常不感兴趣。在这种情况下,应用程序可以使用executor ::: post和executor :: bulk_post安排要执行的可调用对象或可调用对象的范围,而且还告诉任务删除任何返回的值或引发的异常。不对异步结果进行封送比对封送进行封送要快,但是那样我们就无法知道正在进行的任务的状态或结果。

post,bulk_post,submit和bulk_submit在幕后使用底层调度机制。

可以使用concurrencpp结果对象来消耗异步值和异常。结果对象是异步结果的管道,例如std :: future。当任务完成执行时,它要么返回有效值要么抛出异常。 ,此异步结果将编组到结果对象的使用者。因此,结果状态从空闲(异步结果或异常尚未准备好)到值(通过返回有效值终止的任务)到异常之间不等(任务因引发异常而终止)。

结果对象是仅移动类型,因此,将它们的内容移动到另一个结果对象后将无法使用它们。在这种情况下,结果对象被认为是空的,并尝试调用除操作符bool和operator =之外的任何方法。将异步结果从结果对象中拉出之后(通过调用get,await或await_via),结果对象变为空。可以使用操作员布尔来测试是否为空。

可以通过调用result :: wait,result :: wait_for,result :: wait_until或result :: get中的任何一个来等待结果。等待结果是一项阻塞操作(在异步结果未准备好的情况下),并将暂停等待异步结果可用的整个执行线程。通常不鼓励等待操作,并且仅在根级任务或允许它的上下文中允许等待操作,例如阻塞主线程以等待应用程序的其余部分正常完成,或者使用concurrencpp :: blocking_executor或concurrencpp :: thread_executor。

等待结果意味着暂停当前协程,直到异步结果准备就绪。如果从关联的任务返回了有效值,则会从结果对象中减去该有效值。如果相关任务抛出异常,则将其重新抛出。在等待时,如果结果已经准备就绪,则当前协程将立即恢复。否则,它会由设置异步结果或异常的线程恢复。

等待结果对象的行为可以通过使用await_via进行进一步的微调,该方法接受一个执行程序和一个布尔标志(force_rescheduling)。如果在等待的时候结果已经准备好了,则该行为取决于force_rescheduling如果force_rescheduling为true,则当前协程被强制挂起并在给定的执行器中恢复;如果force_rescheduling为false,则当前协程将立即在调用线程中恢复;如果异步结果在等待时尚未准备好,则当前通过设置结果在给定的提取器中运行,协程可以在设置结果后恢复。

解决结果类似于等待结果。不同之处在于,co_await表达式将以就绪状态返回非空形式的结果对象本身。然后可以通过使用get或co_await来获取异步结果。就像await_via一样,resolve_via通过传递执行程序和标志来微调协程的控制流,该执行器和标志建议在结果准备好后如何执行操作。

通过使用co_await(并同时将当前函数/任务也转换为协程)来等待结果对象是使用结果对象的首选方式,因为它不会阻塞底层线程。

class result {/ *创建与任何任务都不相关的空结果。 * / result()noexcept =默认值; / *销毁结果。相关任务不会被取消。析构函数不会阻止等待异步结果准备就绪。 * /〜result()noexcept =默认值; / *将rhs的内容移动到* this。调用之后,rhs为空。 * / result(result&&rhs)noexcept =默认值; / *将rhs的内容移动到* this。调用之后,rhs为空。返回* this。 * / result&运算子=(result&&rhs)noexcept =默认值; / *如果这是非空结果,则返回true。如果this->运算符bool()为假,则应用程序不得使用此对象。 * /运算符bool()const noexcept; / *查询* this的状态。返回值可以是result_status :: idle,result_status :: value或result_status :: exception中的任何一个。如果* this为空,则抛出concurrencpp :: errors :: empty_result。 * / result_status status()const; / *当status()!= result_status :: idle时,阻塞当前执行线程,直到准备好该结果。如果* this为空,则抛出concurrencpp :: errors :: empty_result。 * / void wait(); / *阻塞,直到准备好结果或持续时间为止。解锁后返回此结果的状态。如果* this为空,则抛出concurrencpp :: errors :: empty_result。 * / template< <等级持续时间单位,等级比率> result_status wait_for(std :: chrono :: duration< duration_unit,rat

......