C ++的通用I / O抽象(2020)

2021-04-19 21:57:31

本文是C ++通用异步抽象的续集,其中我讨论了针对C ++ 23的执行程序提案。从那时起发生了一些误。

SG-1 1,研究小组指控所有事物的并发和并行性,使得延伸的提案 - 借鉴了在C ++ 23草案中登陆未来修订的希望。这一工作有了很重要的消息一直酝酿大约十年。

Subjint in Connect和Start的分裂现在是纸张的对象。这是一个非常重要的谜题,我期待在下个月在布拉格讨论它。

最后,但也许更重要的是,Facebook发布了一个名为LibUnifex的发件人/接收者和调度程序的开源实现。这不是P0443的完全实现,它具有更多的功能和算法,但它实现了相同的基本设计和架构。幸运的是,它不使用概念,所以我愚蠢地坚持尝试实现我的C ++ 20Library。幸运的是,科加州被合并为GCC,概念被合并到Clang中,所以现在可以实施执​​行者提案的许多编制者。

调度员概念,您可以在给定的上下文上安排操作(例如线程池)

executor概念,它允许您在给定的上下文(例如线程池)上执行函数。我们看到了Executor如何与executor :: Execute(Func)的概念相比,这可能只是一个用于提交的CPO(计划(调度程序),std :: AS_Receiver(Func))2。

能够在执行上下文上运行代码,例如线程很棒。但是,如果你想在稍后运行代码怎么办?也许一些代码的部分需要每5分钟运行一次:

void child(){while(true){fmt :: print("我们还在吗?"); thred_thread ::睡眠(5min); }} int main(){scheduler auto s = /*...*/执行::执行(s,as_receiver(child));}

这将工作3.但是,没有别的东西会在那个线程上运行,这是一个相当较差的资源使用.Threads比过程昂贵,但他们仍然需要时间来创建:如果您有数以千计的任务,则避免每项任务有一个线程。

我们想要的是任务而不是要打断5分钟的线程。

实际上,任务需要等待时,有许多情况,遍布一个线程:

所有这些操作都可以称为“I / O”,并且在具有内核的平台上,它们通常由内核处理。

例如,调用::读取功能时,内核将暂停调用线程,直到该设备可用于该设备的数据,并调度另一个线程。当数据可用时,线程可以安排回。

这种舞蹈有成本。一个相当小的,你需要创建数百或数千个线程来注意。大多数成本可能来自缓存失效而不是上下文切换本身。

而不是让内核进行调度,而是有系统API,让我们在用户空间中执行调度。

请在文件描述符或句柄上可用数据时,请求内核通知我们

这些posix(epoll是linux特定的)API在这里有不同的行为,因为Julia Evans覆盖了比我能更好的话题。

它阻止,直到至少一个文件描述符已准备好读取或写入

这可能发生在单个线程上(在程序开始等待文件描述符事件之前会熄灭某些任务)或在多个线程中发生,在这种情况下,我们需要同步文件注册。更稍后的更多信息。

例如,反应堆的一个问题是,对于文件的每个读取操作,例如,我们必须:

系统调用相对昂贵,因此在它们有足够的数据之前恢复任务。要对该问题进行比较,更现代的异步I / O API(如AIO(POSIX)或IoCP(Windows),将合并轮询和读取操作。

这减少了Syscalls的数量,并才能才能履行所需的I / O才能完成所需的任务。内核可能会产生自己的工作线程池来执行I / O操作,没有任何东西是真正的自由。但是,这比执行更多系统调用是更高效的。这是工作流程是Proactor模式。

但是(总有一个但是,不在那里?)。虽然人们在Windows上做了异步I / O for Windows(也许是因为Windows上的文件操作很慢),但是Linux上的AIO是不必要的(同步I / o足够快) - 或不足(延迟太多)。事实上,Linux上的事实是在用户空间中实现的 - 但可以使用类似的内核API IO_Submit。在任何情况下,这些API都旨在处理文件I / O,因此无法使用它,以便在所有情况下都会更好地使用它。

也许更多的感兴趣的C ++,人们认为无法设计一个有效的界面,这些接口会介绍一个文件和套房。这解释了为什么我们将ASIO和AFIO作为不同的接口的不同项目,而不是一些通用异步系统。 ,例如libuv或tokio。

Beyoncé说,如果你喜欢它,你应该把一个戒指放在它上面4.Well,我非常喜欢发件人/接收者以及标准通用但有效/ o调度程序的想法,所以也许我们应该把戒指放在上面。更具体地说,是一个IO_URICE。

IO_调节是Linux内核中的一个令人兴奋的新功能,可以允许设计高效,异步框架,该框架也适用于(缓冲和无缓冲)的文件I / O和诸如套接字的其他设备。 IO_调节被添加到Linux 5.1 5作为AIO和IO_SUBMIT的替代,但从那以后,它已经改进了对套接字的支持。它可能变成了一般的异步系统调用界面。

IO_uring基于2队列(一个用于提交的队列,一个完成),即在内核之间共享。即使内核写入它,应用程序线程也可以从提交队列中读取内核。

队列是无锁的单个消费者,单个生产商环(因此名称).since linux 5.5内核将保持溢出列表以保持完成,直到完成队列中的空间。

同样,应用程序必须注意不要溢出提交队列。提交队列只能在6点一次被单线程访问。

一旦将工作添加到戒指中,单个系统IO_URING_ENTER调用可以致电在提交队列中提交所有新工作,并等待要添加的条目TOTEDION队列。

void io_context :: run(){io_uring环; io_uring_queue_init(uring_entries,& ring,0); struct io_uring_cqe * cqe;虽然(true){add_pending_operations_to_io_调节(); io_uring_wait_cqe(&环,& cqe); //单个syscall提交和等待自动*操作=操作_from_Completion(CQE); io_uring_cqe_seen(&环,cqe); Execute_Completion(CQE); } io_uring_queue_exit(& m_ring);}

此幻灯片代码具有措施措施,为我们处理非常低级的用户空间环管理。

运行可以在多个线程上执行,每个线程都有自己的ring.但是,每个队列都只能从单个线程访问一次.OO_URE_WAIT_CQE存在,因为名称建议呼叫,我们如何为队列添加工作?

首先,我们需要一种线程安全的方法来将操作推向上面表示在上面的图形上表示的转换队列缓冲器7。

class io_context {std :: mutex互斥;侵入性<操作*>待办的; void start_operation(操作* op){std :: unique_lock _(mutex); pending.push(op); }};

但是,如果I / O线程当前在IO_URING_WAIT_CQE中被阻止,则如何看到我们添加了队列的元素?

进入和离开IO_URE处理会引发SYSCALL和上下文切换,并且更普遍存在CPU周期。

根据超时的值,它会增加延迟并导致操作开始的延迟,并且当内核开始执行I / O请求时。

相反,我们可以在IO /线程中的伪文件句柄上安排读取操作,并且在发件人线程中,写入该文件描述符,这将导致IO_URE_WAIT_CQETO返回。

在Linux上,我们可以使用EventFd,据我所知,这是最有效的方式做小舞蹈。

class io_context {std :: mutex互斥; std ::队列<操作*>待办的; int fd = :: eventfd(0,O_NONBLOCK); EventFd_t虚拟; void run(){schedule_notify(); while(true){// - io_uring_wait_cqe(& ring,& cqe); if(cqe-> user_data == this){schedule_notify(); //重新臂} // ...}} void schedule_notify(){auto sqe = io_uping_get_sqe(& m_ring); io_uring_prep_poll_read(sqe,fd,& adummy,sizeof(假)); io_uping_set_data(sqe,这个); void start_operation(操作* op){std :: unique_lock _(mutex); pending.push(op); EVENTFD_WRITE(FD,0); //导致IO_URING_WAIT_CQE返回}};

这种对eNqueue工作的机制不具体到IO_URICE,也将用于ePOLL,选择,IO_submit等。

这种方式通知队列和等待完成事件的方式产生了一些超过数千次IOPS的初始化。当这可能似乎不是一个问题,具有较新的标准,如PCI4 / PCI5和相应的驱动器和网络硬件,I / O开始与内核绑定的CPU是瓶颈。

为了实现这种效果,IO_uration提供了一种轮询模式,它可以在一些umerecases中允许非常高的吞吐量。 P2052主张在标准中支持这些模式。

在C ++的通用异步抽象中,我们讨论了在给定调度程序关联的执行上下文上运行操作的调度算法

Oneway_task do_something(执行:: scheduler auto s){co_await执行:: schedule(s); fmt ::打印("你好"); //在与Scheduler S}关联的上下文中运行

既然我们了解IO上下文,我们可以runio操作的IO上下文,我们可以将截止日期参数添加到计划8 algorithm.i从p1031 - 低级文件I / O库中窃取了截止日期的想法.it是一个简单的可以代表时间,相对或绝对的实用程序

任务undiping_child(执行:: scheduler auto s){whis(true){//暂停任务5分钟,//线程在平时co_await执行:: schedule(s,5min)中可以自由地做其他事情; FMT ::打印("我们还在吗?"); }}

在这里,执行::时间表(s,5min);返回一个发件人,就像我们最后一次看到调度算法一样。唯一的区别是启动方法将导致内核计划的超时“I / O”操作。

IO_INCE恰好有内置超时支持。 其他调度程序可以在Windows上使用Timerfd或CreateThreadPoolTimer。 在各种模式中读取,写入文件描述符(文件,套接字,管道,其他“文件)”对象“ 更有可能的是,我们得到了一些IO对象,如文件和套接字 模板<执行:: scheduler scheduler = std :: default_scheduler> 类文件;任务read_data(执行:: scheduler auto s,buffers& buff){file f(s); co_await f.open(" myfile.txt"); co_await f.read(beffs); co_await f.close();} 有一个有限的,固定的硬件线程数,与RAM不同,无法下载更多。 因此,理想情况下,程序应最多使用相同数量的常态线程,因为存在活动线程。 不幸的是,独立的库可以使用自己的线程和线程池.I / o库可能会创建自己的循环,以及每个图形框架。

标准库在内部用于并行算法和STD :: Async.USYNC.USYNC.在某些实现中,每个STD :: ASYNC调用有一个线程(其中一个原因之一,为什么std :: std :: async是可怕的)。

虽然我们可以一次转换1000个传染媒介的元素,但它难以同时转换1000次1000次的1000个vectors。或者其他的东西。

这就是为什么P2079 - Executors的共享执行引擎为全局可访问的执行上下文的情况做出了这种情况。

我喜欢那篇论文,但我们真正需要的是全球访问的IO上下文。或者更具体地,全球可访问的IO调度程序。

在让这张脸上(可能不是正确的脸部)之前,在向标准添加单例的想法时令人困惑地吓坏了,值得注意的是,一些平台很久以前就达到了相同的结论并暴露了aglobal I / O背景所有应用程序:

Windows Threads Pools公开默认的线程池 - 可以提交工作 - 包括IO请求。 Microsoft的STL实现使用了这一点。

Apple Platforms有大中央调度,其采用类似地,但具有远冷却的名称。

在其他POSIX平台上没有等效的脱磁性解决方案。虽然单线上下文很简单,但用户空间调度仍在调度,并且调度很难。

有一些可以在Linux上使用的库,例如libdispatch或libuv,或者实现者可以为刮擦烹饪。

C ++中的错误管理被认为是一个简单且解决的问题9.要调整Spice Mission,Asynchrony会添加第三个通道:取消.indeed,取消不是错误10。

但在我们可以谈论处理取消之前,让我们谈谈发出消除request.you通常会取消整个任务或操作,然后将取消整个后续操作链。

例如,如果我们取消读取,则不应执行写入。[p1677]取消中提到的,是从函数早期返回的异步版本。

这是基于与C#的CunellationToken和JavaScript的AbortControll相同的想法。

stop_source可以创建令牌,stop_token有一个stop_requested方法,retch_source :: request_stop()被调用。此外,调用stop_source :: request_stop()时可以自动触发回调。

连接到同一stop_source的所有令牌和回调共享Samethread-Safe Ref计数的共享状态。(您仍然负责确保用作stop_callback的函数是selvesthread-safe,如果您有多个线程,则为selveShread-safe。)

它已在GCC中实现,因此您可以在编译器资源管理器上玩它

#include< stop_token> #include< cstdio> int main(){std :: stop_source stop;自动令牌= stop.get_token(); std :: stop_callback cb(令牌,[] {std :: puts("我不想停止所有\ n");}); std :: puts("唐' t现在阻止我,我有这么美好的时光\ n"); stop.request_stop(); if(token.stop_requested()){std :: puts("好的\ n"); }}

然后可以附加到附加到任何接收器的适当类型120or的Coroutine任务。

然后可以由执行上下文使用自定义点执行:: get_stop_token(执行:: Receiver Auto)来查询是否取消操作。

应在执行上下文中取消操作,它们旨在执行。

在飞行中的I / O操作的情况下,可以将请求发射到内核TOCANCEL该请求(CANCLIO上Windows上的CANCELIO_ASYNC_CANCEL,AIO_CANCEL等)。疑难解决定时器,套接字读取或可能永远不会完美的操作。

在某些时候,我使用了一个停止令牌来停止执行上下文并取消飞行中的所有ThetaSks。这是超级方便的。

不幸的是,灾难作为取消任务的灾难可能导致它是重新安排的另一个任务要在可能被销毁的执行上下文上安排。我必须承认,说服我采取了一些努力(谢谢刘易斯!)。

相反,在完成该上下文上可能运行或安排其他操作的所有操作之前,不应销毁执行上下文。

这可以通过STD :: Async_wait算法来实现我在我的第一个关于executors的帖子中提到的算法。

虽然不是玫瑰:发件人/接收者之间存在一些不匹配的并等待/持续。

Coroutines具有返回值(其是单一类型 - 而接收器支持多个值类型P1341),并且可以重新启动异常13。

任务示例(){inspect(auto res = co_await发件人){< cancelled_t&gt ;: {} res.success():{} res.failure():{}};}

上面的示例展示了模式匹配,虽然我不确定我们可以混合两种类型和表达式匹配器。

我们不能使用类型来区分成功和失败,因为它们可能具有相同类型。

语义 - 使用例外信号取消使它看起来像取消是一个错误,它不是。这样的贫困!

性能 - 对例外的依赖使得它更加难以在嵌入式平台上使用,就像所需的堆分配不够差!除了性能之外,有时缺乏对例外的非常支持。

但是,在真理中,考程内不必使用异常来报告不同的结果。这是Coroutine的简化图.CoroROTINE被暂停,然后在由延续句柄表示的特定点恢复。

我们可以想象有几种可能的持续延续的科稿,以恢复操作的结果。

这将是接收器的更好建模,不会遭受例外的性能和实现问题(以具有更多Coroutine_Handle跟踪的成本。)

Goroutines是Go编程语言的一个特征,与C ++ Coroutines非常不同,因为它们不仅是Stackfull,而且还模拟了恢复机制和调度机制.Go为您提供了一种烘焙的I / O和Coroutines调度程序当它执行I / O时,将代表中断Goroutine的Progroughing处理,尝试获取锁定或任何其他阻塞操作。

C ++ Coroutines不是Goroutines。 C ++ Coroutines并不意味着异步,更不用说Scheduling.c ++不是那种将烘烤I / O调度程序的语言,因为它会戈拉曼斯特“不要支付你不使用的东西”咒语C ++在许多环境中无法使用。

考程,发件人接收器和I / O调度仪的组合可以模拟Goroutine(井,非承诺的堆积).c ++科素也可以用作简单的同步发电机。它是一个更广泛的可扩展系统。

我认为最终目标是因为每个潜在的阻塞呼叫都是互换表达式。就像在go.not烘烤语言思想中,但作为图书馆解决方案。

例如,LibUnifex实现了异步互斥锁(不像ASIO' S Strands),这样您就可以通过恢复Coroutine获取锁:

沿着Goroutines,Go提供频道,这些频道是Go的最佳功能之一.Channels是概念上,相对简单的。频道是一个多营收者,多于队列队列。从队列中暂停Goroutine直到数据可用。写入可以是缓冲的(保存书面数据,写作者可以继续在其侵入状态下) - 或者没有缓冲(写入器被暂停,直到读者准备采取数据).Well ...

使用namespace cor3ntin :: corio;模板<执行:: scheduler scheduler> Oneway_Task Go_Write(Scheduler Sch,Auto W){int i = 10;虽然(i){co_await sch.schedule(std :: chrono :: milliseconds(100)); co_await w.write( - i);模板<执行:: scheduler scheduler> Oneway_task go_read(调度程序sch,auto r,stop_source& stop){whis(true){int值= co_awaitr.read(); std :: cout<< "有价值" <<值<< " \ n&#34 ;; if(value == 0){stop.request_stop();休息; }} int main(){stop_sour

......