项目织机和结构化并发

2020-12-04 21:02:08

1998年,令人惊奇的是Sun Java Web Server(Tomcat的前身)在单独的线程而不是OS进程中运行每个请求。这样就可以满足数千个并发请求!如今,这并不那么令人惊奇。每个线程占用大量内存,典型的服务器上不能有数百万个线程。

因此,服务器端编程的现代口号是:“永不阻塞!”相反,您指定一旦数据可用就应该发生什么。

这种异步编程风格非常适合服务器,使它们可以轻松支持数百万个并发请求。对于程序员而言,它并不是那么好。

HttpClient.newBuilder().build().sendAsync(request,HttpResponse.BodyHandlers.ofString()).thenAccept(response->。。。); .thenApply(。。。); .exceptionally(。。。);

我们通常用语句实现的功能现在被编码为方法调用。如果我们喜欢这种编程风格,则不会在Lisp中使用我们的编程语言来编写语句和编写愉快的代码。

在JavaScript中,标记为“异步”的代码将重写为方法调用,就像您刚刚看到的那样。但这意味着您只能从其他异步方法中调用异步方法,并且您的API分为同步和异步部分,从而迫使您复制功能。

Project Loom从Erlang和Go等语言中获得指导,通过使代码块变得非常便宜来解决问题的根源。您可以在“虚拟线程”中运行任务,“虚拟线程”是一种几乎无限的资源,已映射到实际的“载体”线程中。当一个虚拟线程阻塞时,它被“停放”,并且另一个虚拟线程在承载线程上运行。该名称应该使您想起映射到实际RAM的虚拟内存。

在抱怨名称之前,请记住命名很困难。 Loom团队之前曾尝试使用“纤维”,但已在其他地方使用,其含义略有不同。当出现轻量级或更新的线程时,“轻量级”或“新”线程可能看起来很愚蠢。

在尝试了针对OS线程和虚拟线程的单独类之后,他们最终决定为二者使用单个类(熟悉的java.lang.Thread)以简化迁移。

当然,自Java 1.0以来已有25年历史的老式java.lang.Thread占有一席之地。尴尬的取消,线程组,优先级,过时的方法停止,挂起,恢复。 Loom团队认为这些责任微不足道,因为大多数程序员从来没有明确使用Thread API,而是使用ExecutorService启动任务。 (当然,相同的参数将支持使用更干净的虚拟线程API。)

如果您已经存在很长时间了,您可能还记得Java的早期版本具有映射到OS线程的“绿色线程”。但是,有一个关键的区别。当绿色线程被阻塞时,其承载线程也被阻塞,从而阻止所有其他绿色线程取得进展。

如前所述,虚拟线程是Thread类的对象。这是三种生产纤维的方法。首先,有一个新的工厂方法可以构造并启动一个虚拟线程:

但是,正如您多年以来一直被告知的那样,使用执行程序服务比手动构造Thread实例更好。静态方法Executors.newVirtualThreadExecutor()提供了一种。 (现有的执行程序服务对虚拟线程没有用。将它们合并会适得其反!)

与Executors类中的其他工厂方法一样,您可以选择提供ThreadFactory。新的Thread.Builder类提供了一种提供工厂而不是单个实例的简单方法:

让我们尝试一下。作为第一个测试,我们只是睡在每个任务中。

导入java.util.concurrent。*; public class Test {public static int DELAY = 10_000;公共静态整数NTASKS = 1_000_000; public static void run(Object obj){try {Thread.sleep((int)(DELAY * Math.random())); } catch(InterruptedException ex){ex.printStackTrace(); } System.out.println(obj); } public static void main(String [] args){ExecutorService exec = Executors.newVirtualThreadExecutor(); for(int i = 1; i< = NTASKS; i ++){字符串taskname =" task-" +我; exec.submit(()-> run(任务名)); } exec.close(); }}

运行该程序,它就可以正常工作。然后尝试使用OS线程-更改为Executors.newCachedThreadPool()或Executors.newFixedThreadPool(NTASKS)。该程序将耗尽内存;在大约25,000个线程之后,在我的笔记本电脑上。

好的,但是实际上,您不想睡觉,但是要做一些有用的工作。考虑一个改编自亨氏·卡布兹(Heinz Kabutz)的拼图游戏的程序,该程序每天从Dilbert或Wikimedia获取图像。它由ImageProcessor和ImageInfo类组成。该代码是曲折的段落的一个不可逾越的迷宫,都一样(即辅助函数产生可完成的期货)。

使用虚拟线程,只需同步读取Web内容。它会阻止,但我们不在乎。所有的复杂性都消失了。控制流程简单易懂。

exec.submit(()-> {字符串pageURL = info.getUrlForDate(date);字符串page = getSync(pageURL,HttpResponse.BodyHandlers.ofString());字符串imageURL = info.findImage(page).getImagePath(); byte []图片= getSync(imageURL,HttpResponse.BodyHandlers.ofByteArray()); info.setImageData(image); process(info);返回null;});

专家提示:语句返回null;使lambda成为Callable而不是Runnable,这样您就不必捕获已检查的异常😜

尝试一下您关心的事情。调用Web服务并建立数据库连接,而不必担心回调。当阻塞便宜时,很多偶然的复杂性就消失了。当然,要在网络应用程序框架中使用此代码,您必须等待框架提供程序在虚拟线程中运行代码。

在这篇强烈推荐的文章(从中拍摄以下图像)中,Nathaniel Smith提出了结构化的并发形式。这是他的中心论点。在新线程中启动任务实际上并不比使用GOTO编程好,即有害:

当多个线程在没有协调的情况下运行时,其意粉代码将重新出现。在1960年代,结构化编程将goto替换为分支,循环和函数:

当您查看一行代码时,便知道该程序是如何到达那里的。

结构化并发对于并发任务执行相同的操作。通过阅读程序文本,我们应该知道它们什么时候完成。

这样,我们可以控制任务使用的资源,并且知道何时该继续进行。

在Loom中,ExecutorService实现了此基本构造。 ExecutorService有一个close方法,该方法将阻塞直到其所有虚拟线程都完成为止。 (我在第一个示例程序中使用此方法来使main保持活动状态,直到完成所有虚拟线程为止。过去,您不得不调用awaitTermination方法。)

方便地,ExecutorService实现了AutoCloseable接口,因此您可以只使用try-with-resources语句:

尝试(ExecutorService exec = Executor.newVirtualThreadExecutor()){for(int i = 0; i< NTASKS; i ++){exec.schedule(()-> run(i)); }} //阻塞,直到所有线程完成

我编写了一个简单的Web爬网程序作为虚拟线程的演示,这是Crawler类。在第一次尝试中,我为页面中的每个URL触发了一个新的虚拟线程。如果我想成为Google,可以让我的搜寻器永远运行。但是我想从起点跳到不超过3跳。有了“一劳永逸”,就无法知道何时进行递归。

相反,对于每个页面,我都会提供一个新的执行服务并等待完成。这样,所有页面均已爬网后,整个程序将完成。

这似乎有很多障碍。但是在Loom中,封锁很便宜,因此我们不必为此担心。

我们习惯于使用一个执行程序服务作为所有任务的线程池。但是在Loom中,建议您为每个任务集使用单独的执行程序服务。

爬网时,您很可能会遇到无效链接。从一个人开始阅读最终会超时,但是这可能会花费惊人的时间。

当然,标准补救措施是提供超时。 Loom更喜欢截止日期而不是超时,因此您可以指定

为什么要截止日期?通常,超时的表现不佳。假设您必须完成两个连续的任务,并且总超时时间为10秒。您不想为每个任务指定5秒钟的超时时间。毕竟,如果一个人花了6秒钟,而其他人花了3秒钟,您仍然可以进入终点线。要获得第二个任务的超时时间,您必须测量第一个任务的持续时间,然后从总体超时中减去该时间。有最后期限,这要简单得多。每个任务都有相同的截止日期。

调用exec.close()会阻塞,直到所有虚拟线程都已完成或截止日期到期为止。然后,它取消所有剩余的虚拟线程。

在Java中,您可以通过调用线程的中断方法来取消线程。这仅设置线程的“中断”标志。取消是合作的。线程必须定期轮询“中断”标志。除了调用阻塞操作时,线程无法执行任何轮询。相反,如果阻塞操作检测到线程被中断,则抛出InterruptedException。因此,线程还必须捕获那些异常。在这两种途径中,线程都必须清理资源并终止。这可能很乏味且容易出错,但是这是很好理解的。显然,人们可以想像出更好的方法,但是Loom现在并没有到那里去。

有一项重要的改进。取消虚拟线程在结构上是合理的。假设一个虚拟父线程已经产生了一些子线程,然后将其取消。显然,其目的也是取消子线程。而这正是发生的情况。

此外,在虚拟线程中,取消驻留的操作可能比取消本地阻止的操作要快得多。特别是,取消等待套接字的虚拟线程是瞬时的。

结构化取消并不新鲜。考虑一下ExecutorService.invokeAny,它从Java 1.5开始就存在。该方法同时运行任务,直到一个任务成功完成,然后取消所有其他任务。与CompletableFuture.anyOf相比,它可以使所有任务运行完成,即使除了第一个结果以外的所有结果都将被忽略。

线程局部变量是Project Loom实现者的痛点。快速刷新:您使用ThreadLocal对象存储与当前线程关联的值。调用get,set或remove时,仅影响当前线程的实例。

public static ThreadLocal currentUser = ...; ... currentUser.set(userID); //很久以后,几个函数调用deepint userID = currentUser.get()//获取当前线程的值

普遍引用的线程本地原因是使用非线程安全类,例如SimpleDateFormat。您想要共享一个而不是构造新的格式化程序对象。您无法共享一个全局实例,但是每个线程一个实例是安全的。

该用例不能很好地用于虚拟线程。如果您有100万个虚拟线程,您真的需要那么多SimpleDateFormat实例吗?您可以只切换到线程安全替代方案,例如java.time.formatter.DateTimeFormatter。

线程本地变量有时用于任务全局状态,例如HTTP请求,数据库连接或事务上下文。将这些对象作为方法参数传递是很麻烦的,因此您希望它们可以全局访问。每个任务都有自己的任务,因此您不能有一个全局变量。每个线程一个线程似乎是一个方便的解决方案。

唯一的例外是,在单个线程上执行多个任务的情况并非如此,而线程池正是这种情况。网络上充满了可怕的警告,以确保您删除了线程本地变量,以免它们泄漏到其他任务中。

您真正想要的是局部任务变量。使用Loom,这更有前景,因为一个任务对应一个虚拟线程。

当一个任务产生其他任务时会发生什么?这取决于。有时,孩子应该继承父母的本地人。这就是可继承线程局部变量的用途。

除此之外,当前可继承线程本地实现的成本很高。所有可继承线程的本地变量都复制到每个子线程,以便子线程可以更新或删除它们而不影响父线程。

真正需要的是针对这些任务可继承变量的更好的机制。您可以在此处找到可行解决方案的草图。到目前为止,尚未做出任何决定。

虚拟线程允许线程本地,但默认情况下不允许线程本地继承。您可以使用Thread.Builder方法覆盖任何默认值:

目前,您可以继续使用线程本地,但是如果您使用可继承线程本地,则需要配置线程工厂。如果启动大量的虚拟线程,请注意对内存的影响。

ExecutorService类的Submit方法产生Future。例如,您可以将Callable的值获取为

Callable< Integer>可调用=()-> {Thread.sleep(1000);返回42; }; Future< Integer> future = exec.submit(callable); int结果= future.get(); //阻塞直到任务完成

顺便说一句,Loom向Future接口添加了一个join方法,其工作方式与get一样,但是如果任务被取消或异常终止,则抛出未经检查的CancellationException或CompletionException而不是经过检查的InterruptedException / ExecutionException。

Loom向ExecutorService添加了一个SubmitTask方法,就像Submit一样,但是它产生了CompletableFuture:

CompletableFuture有什么更好的选择?它实现了Future接口并添加了以下方法:

isCompletedExceptionally,以查找任务是否因异常终止(但遗憾的是,没有用于获取异常对象的方法)

我不知道将CompletionStage方法与Loom结合使用是否有价值,而Loom的主要目的是避免链接回调的麻烦。显然,我们不关心手动完成未来。我的猜测是之所以选择CompletableFuture是因为我们需要异常信息,并且没有人愿意在已经杂乱的API中引入另一个类似Future / Promise的类。

除了submitTask,还有一个submitTasks方法与现有invokeAll方法类似。但是,它返回CompletableFuture的列表,其中invokeAll返回Future的列表。

现在您有了一份期货清单。您如何收获结果?您可以遍历列表,并在每个将来致电get:

当然,这些呼叫会阻塞,您可能会很不高兴第一个呼叫花费的时间最长。但是谁在乎-那么所有其他呼叫都是即时的。

但是,如果您不希望获得所有结果,该怎么办?然后,您可以使用CompletableFuture类中新完成的方法在结果到达时进行选择。这类似于ExecutorCompletionService,但是它使用流而不是队列。例如,这里我们收获前两个结果:

流管道阻塞,直到获得前两个结果。最后一行取消其余任务。

我不确定在这里重用现有的ExecutorService和CompletableFuture API是否会成功。我发现许多相似但不同的方法令人困惑。当然,如果其他人也有同样的感觉,则API的详细信息可能会发生变化。

Project Loom通过为线程提供无限数量的线程来重新定义线程。由于我们不必合并线程,因此每个并发任务都可以拥有自己的虚拟线程。而且由于阻塞很便宜,因此我们可以使用简单的控制流(分支,循环,方法调用,异常)来表达逻辑。无需麻烦的回调或异步风格。

因为API是基于我们所知道的(线程,执行程序),所以学习曲线很低。另一方面,子孙后代在必须学习过去的残篇时可能会诅咒我们的懒惰。

仍然存在重要的局限性。特别是,尚未改进对监视器(java.lang.Object锁)和本地文件I / O的阻止,它将阻止载体线程。

请记住,Project Loom不能解决所有并发问题。 如果您有大量计算任务,并且想让所有处理器内核都忙,它对您无济于事。 它对使用单个事件线程的用户界面没有帮助。 当您有很多任务花费大量时间阻塞时,Project Loom的好处就是。 试一下Project Loom,看看它如何与您的应用程序和框架一起工作,并提供有关API和性能的反馈!