使用 RabbitMQ Streams 进行消息重复数据删除

2021-07-31 00:32:00

2021 年 7 月 28 日 RabbitMQ Streams 概述介绍了流,这是 RabbitMQ 3.9 中的一项新功能,RabbitMQ Streams First Application 提供了流 Java 客户端的编程模型的概述。这篇文章介绍了如何在 RabbitMQ Streams 中对已发布的消息进行重复数据删除。由于重复数据删除是一个关键且复杂的概念,本文将逐步引导您了解此机制,从一个幼稚且有些损坏的发布应用程序到优化且可靠的实现。应用程序很容易多次发布相同的消息:应用程序以错误的方式重新启动并从头开始重新发布所有数据,网络故障使应用程序重新连接并重新发送几条消息,等等。尽管消费应用程序应该使它们的处理具有幂等性,但应尽可能避免重复发布的消息,因为它们会减慢处理速度并占用额外的空间。这篇文章将从一个简单的应用程序开始,该应用程序会生成大量重复的消息(以帮助掌握问题),并将一点一点地改进它,最终得到一个强大的解决方案。发布程序模拟一个从数据源读取记录并为这些记录中的每一个发布消息的应用程序:Producer producer = environment。生产者建造者 () 。流(“重复数据删除流”)。建造 (); int messageCount = 10 ;记录( 0 , messageCount )。 forEach(record->{Message message=producer.messageBuilder().addData(record.content().getBytes(StandardCharsets.UTF_8)).build();producer.send(message,confirmationStatus->latch.countDown()) ; });

我们假设应用程序读取了所有可用的记录,并且第一次运行时该数字是 10。如果您想要有关流 Java 客户端 API 的提醒,您可以阅读 RabbitMQ Streams First Application。如果你想在阅读时运行代码,你可以继续下一节。注意你可以按照帖子的其余部分不运行任何东西,所以如果你不想尝试,你可以跳过下一节代码。运行示例需要安装 Docker、Git 和 Java 8 或更高版本。您可以使用以下命令启动代理: docker run -it --rm --name rabbitmq -p 5552:5552 \ -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS = '-rabbitmq_stream Adverted_host localhost' \ rabbitmq:3.9 代码托管在 GitHub 上。以下是如何克隆存储库并创建示例中使用的流: 在第一次运行时,应用程序从数据源读取所有记录(总共 10 条记录)对于这次运行)并为每个人发送一条消息。我们可以使用以下命令检查流的内容:./mvnw -q compile exec:java -Dexec.mainClass = 'com.rabbitmq.stream.Deduplication$Consume ' 正在连接...已连接。开始消费,按回车退出...message 0message 1message 2message 3message 4message 5message 6message 7message 8message 9

到目前为止,一切都很好,我们发布了 10 条消息,我们可以在流中看到 10 条消息。现在让我们看看我们的应用程序是否可行并在第二次运行时保持正常工作。现在我们可以想象,我们在第二天运行应用程序,数据源包含 10 条额外记录,所以总共有 20 条记录。我们的发布应用程序是愚蠢的:它会从数据源读取所有内容并发布消息。让我们尝试:./ mvnw -q compile exec:java -Dexec.mainClass = 'com.rabbitmq.stream.Deduplication$Consume'Connecting...Connected.Starting消耗,按回车退出...message 0message 1message 2...message 9message 0message 1message 2...message 9message 10message 11...message 19 我们看到 30 条消息:第一次运行的 10 条消息和第二次运行的 20 条消息。前 10 条出现两次,所以我们的流包含重复。按照我们实现的方式应用程序这是预期的,但我们必须解决这个问题,因为我们只想在第二次运行时发布新记录。流 Java 客户端文档提供了有关生产者名称和发布 ID 的更多详细信息。注意消息重复数据删除并非特定于流 Java 客户端,只要符合语义,任何客户端都可以实现。我们只需要为我们的发布应用程序选择一个名称,并在不同的运行中保留这个名称。对于发布 ID,我们可以使用记录的 ID:它恰好是唯一的,并且记录按 ID 排序返回(例如只是像数据库中带有数字主键和适当查询的记录)。生产者生产者 = 环境。生产者建造者 () 。流(“重复数据删除流”)。 name ("app-1") // 为生产者提供一个名字。 ConfirmTimeout ( Duration . ZERO ) // 永不停止重试。建造 (); int messageCount = 10 ;记录( 0 , messageCount )。 forEach ( record -> { Message message = producer .messageBuilder ().publishId ( record . id ()) // 设置发布ID . addData ( record . content ( ). getBytes ( StandardCharsets . UTF_8 )) .build();生产者.发送(消息,confirmationStatus->闩锁.countDown());});

代理将跟踪此生产者的最后一个发布 ID。我们将看到这如何允许对消息进行重复数据删除。 ./mvnw -q compile exec:java -Dexec.mainClass = 'com.rabbitmq.stream.Deduplication$CreateEmptyStream'Connection...连接。尝试删除流(如果存在)。已删除流。创建“重复数据删除流”流。已创建流。现在我们在第二天运行我们的应用程序,有额外的 10 条记录。我们的应用程序没有第一次那么愚蠢:它使用生产者名称和发布 ID 进行重复数据删除。但它仍然从数据源读取所有记录: ./mvnw -q compile exec:java -Dexec.mainClass = 'com.rabbitmq.stream.Deduplication$Consume'Connecting...Connected.Starting 正在消费,按回车退出...message 0message 1message 2...message 9message 10message 11message 12...message 19 这次没有重复,很好!即使我们重新发布了前 10 条消息,broker 还是设法过滤掉了它们。它知道它应该忽略发布 ID 小于 9(第一次运行中的最后一个值)的所有消息。注意,即使它过滤掉了这些重复项,它仍然向客户端确认了它们。这比我们第一个最终出现重复的应用程序要好得多,但仍然存在一个问题:应用程序每次都重新发送所有消息。如果数据不断增长,应用程序每次运行将花费越来越多的时间。幸运的是,可以找出应用程序在上次运行中停止的位置。在本节中,我们将看到如何通过不仅使用重复数据删除,而且还查询代理以获取它发送的最后一个发布 ID,从而使发布应用程序更加智能。

./mvnw -q compile exec:java -Dexec.mainClass = 'com.rabbitmq.stream.Deduplication$CreateEmptyStream'Connection...连接。尝试删除流(如果存在)。已删除流。创建“重复数据删除流”流。已创建流。这个版本的应用程序不是最聪明的,但对于“第一天”来说已经足够了。发布应用程序需要在第二天做得更好,其中数据源现在包含 20 条消息。它可以使用 Producer#getLastPublishingId 方法向代理查询此生产者的最后一个发布 ID 为该流。应用程序可以添加 1到这个值,它会得到它的起点。然后它只需要从这一点选择记录,直到最后一条可用记录。这样它就不会从头开始重新发布。以下代码显示了如何做到这一点:生产者生产者 = 环境。生产者建造者 () 。流(“重复数据删除流”)。 name ("app-1") // 为生产者提供一个名字。 ConfirmTimeout ( Duration . ZERO ) // 永不停止重试。建造 ();长开始=生产者。 getLastPublishingId () + 1 ; // 获取最后的发布 ID 并添加 1 int messageCount = 20 ;记录( start , messageCount )。 forEach ( record -> { Message message = producer .messageBuilder ().publishId ( record . id ()) // 设置发布ID . addData ( record . content ( ). getBytes ( StandardCharsets . UTF_8 )) .build();生产者.发送(消息,confirmationStatus->闩锁.countDown());}); ./mvnw -q compile exec:java -Dexec.mainClass = 'com.rabbitmq.stream.Deduplication$PublishSmartDedupSecondDay'Connecting...Connected.Starting at 10Publishing Publishing 10 message with deduplication enabled.Messages确认?是 所以发布者从 10(9,第一次运行的最后一个发布 ID,+1)开始,发布 10(20,总共,- 10 个已经发布)新消息。我们可以检查流的内容:./ mvnw -q compile exec:java -Dexec.mainClass = 'com.rabbitmq.stream.Deduplication$Consume'Connecting...Connected.Starting消耗,按回车退出...message 0message 1message 2...message 9message 10message 11message 12...消息 19

我们在流中获得了预期的消息数量,但这次使用了优化的发布应用程序。需要生成应用程序的名称和发布 ID 才能启用重复数据删除 发布 ID 是一个严格递增的序列,它通常是给定消息的标识符(例如数据库记录的主键、文件中的行)应用程序应该查询他们用来从中断处重新启动的最后一个发布 ID 的代理