流系统中的内部一致性

2021-04-19 22:01:03

免责声明:我曾经努力实现这一帖子中的系统之一。

这篇文章的核心消息是从分布式键值数据库的直觉,Don' t携带到做复杂计算的系统。最终一致的键值数据库可能会显示旧值。最终一致的流系统可能会显示任何对任何一组输入完全不可能的输出,并且只要新的输入继续到达,就不会收敛到正确的值。

要重新获得数据库的直觉,我们还需要内部一致性 - 一个属性I' LL定义下面。 I' LL还涵盖了一些类内部一致性失败,并演示了如何在野外重现这些内部,使用KSQLDB和Flink作为内部不一致的系统和差分数据流的示例,作为内部一致系统的示例。

输入是10M交易流,在10个不同的帐户之间移动。移动的金额总是1美元,使其更容易眼球错误。交易包含60个不同的时间戳(仿佛由某些常规批处理CDC生产),并且可以在最多10秒内耗尽顺序。

随机的 。 SEED(42)MAX_ID = 10000000 Transaction = [] ID范围内(0,max_id):second =((60 * id)// max_id)延迟=随机。统一(0,10)行= JSON。转储({' id':id和#39; from_account':随机。randint(0,9),' to_account':随机。randint(0,9), '金额':1,' ts':f' 2021-01-01 00:00:{秒:02d} .000',})交易。附加((第二+延迟,ID,Row))交易。 for()for(_,id,行)交易:打印(f' {id} | {行}')

对于支持它的系统,使用5秒的水印读取交易以应力处理后期数据。

我们想要做的计算非常简单 - 只是跟踪每个帐户的当前余额 - 但它强调了大多数失败模式I' LL在下一节中描述。

创建视图信用作为选择To_Account As帐户,总和(金额)作为To_Account的Transaction Group的Credits;创建视图借记作为Select from_Account As帐户,总和(金额)作为来自事务组的借方来自from_account;创建视图余额作为选择积分。账户账户,信用 - 借记贷款差额,借记贷方。帐户=借记。帐户 ;

外接和自我连接往往很难在流式设置中进行右侧,所以包括一个(有些毫无意义的)查询测试两者。由于连接的两侧具有相同的输入,因此输出不应包含空值。

创建视图ofter_join作为选择t1。 ID作为id,t2。从(从事务中选择ID)作为T1左连接(从事务中选择ID)作为T2ON T1,ID为id。 ID = T2。 ID ;

由于金钱只是被移动,从未创造或销毁,所有余额的总和应该永远是0.这给了我们一个非常清晰的不变性,用于检测一致违规。

这是一个非常简化的代码版本,即我在一个真实的系统中编写的代码,我们需要确保没有客户发现一系列会产生金钱的一系列行动,例如通过造成舍入符合他们的青睐。如果总数是非零,它会触发警报。

有一点值得注意的是,此查询不会窗口任何输入,将其牢固地放在地图的低时间位置侧,其中一致性更难以维护。

如果我们停止提供新的输入,则系统最终是一致的,它最终为到目前为止提供的一组输入的正确输出。

如果每个输出是到目前为止提供的输入的某些子集的正确输出,系统是内部一致的。

对于我们的示例,这意味着如果在处理所有输入之后总数最终变为零,则最终将是一致的,如果它永远不会输出总计,则在内部一致。

内部一致性允许系统具有任意处理延迟。它还允许丢弃输入 - 这不是最终一致的,而是在许多应用程序(例如分析)中,如果有一些小百分比丢失,那么它并不是什么大不了的。

最终的一致性要求输出最终如果输入停止到达,则允许系统产生废话输出,只要输入到达。如果我们始终拥有新的输入,那么它永远必须收敛到正确的输出。

在实践中,我们通常希望内部一致性和一些更强烈的最终一致性形式,包括活动担保。我们希望系统产生正确的输出,我们希望一些保证每次输入最终都会被处理。

这不是违反内部一致性的详尽概要,只是主要原因我'迄今为止看到的。

结合多个流时,它与同步它们的重要性,使得每个输出反映相同的输入集。

在我们的示例中,如果学分和借记之间的连接不同步,那么我们可以使用当前信用值和借记的过去价值计算余额,有效地创建金钱。

这将在内部不一致,但仍然最终一致 - 如果我们停止添加新事务,则流最终将再次恢复SYNC并返回0.但是如果输入是无限制的,则将一条流始终领先于另一个流程永远不要返回0。

这些流的这些融合出现在任何SQL查询中,它在像argmax这样的自引导计算中使用相同的表,以及像追随者的追随者一样的图形遍历。它们还出现在我知道将SQL子查询编译为流操作的最佳方法。

处理事件时间通过超出订单数据时,可能会发生非常相似的问题,其中连接到连接中的两个输入流需要通过事件时间同步。下面的所有系统患有同步故障的所有系统也会遭受类似的发生故障,以便进行事件时间加入。

许多流媒体运算符是逻辑上单调的 - 接收新输入只能使它们发出新的输出。这使得生活变得容易实现最终的一致性。

但是一些运营商是非单调的 - 特别是外连接和聚集体。这些需要更多的护理。

为了产生正确的输出,总计需要处理原因上的4个事件。如果它一个接一个地处理它们,则输出将是:

#处理删除借记(3,712)删除总(0)插入总数(712)#处理插入借记(3,713)删除总数(712)插入总数(-1)#处理删除贷记(7,422)删除总数(-1)插入总数(-423)#处理插入码(7,423)删除总数(423)插入总数(0)

要在内部一致,总计需要等到它'在发出输出之前看到了与原始事务对应的所有更新。

在上面的输出中,总共通过一系列中间值,然后返回到0.这些中都不是总计的正确(甚至可能)值,但此流不区分对校正值的更改和中间输出的校正。 。

最终对本值的疑问,它'足以在最新值上运行。但是对于价值历史的疑问(例如"这个帐户余额是负面的")它'除非更新流区分中间输出,否则不可能是最终的,除非更新,甚至可以将中间输出与正确的输出区分开来。

同样,在系统的边缘'如果您不知道哪些是正确的,则无法对输出采取行动。您可能会向客户发出警告,以便在其帐户中删除,仅向后来发现这只是一个瞬态不一致。

要在内部一致中,需要有一些方法来确定输出值是否正确。这可以是例如进度陈述的形式,告诉您给定时间戳的输出不再更改。

这更像是一个边缘案例,但与流同步不同,这类故障可能导致可以在不重新启动整个拓扑的情况下修复永久不一致。

在大多数系统中,输入可以从订单中到达。某些操作,尤其是窗口聚合,在他们可以提供规范答案之前需要查看所有输入。要处理这一点,我们可以使用水印增强输入流 - 说&#34的消息;现在已经看到了时间< t&#34的所有输入;

在实践中,我们经常可以保证我们实际上看到了所有输入,所以这些水印刚刚在一些合理的截止时间后插入,我们删除了水印后到达的任何消息。

这本身并不违反内部一致性,因为我们允许删除消息。但是一些流系统(特别是Kafka Streams)在各个操作员中插入水印而不是系统的边缘。这允许不同的运算符在其流时间等于不同的情况下丢弃输入的不同子集。这可以通过看到输入的不同子集来引起输入的不同子集,通过看到不同的顺序,或者某些先前的操作员使用壁时钟超时而不是流时间超时,而不是流时间超时。当比较来自这些操作符的输出流或组合时,不一致确保不一致。

要在内部一致中,流系统只能拒绝边缘的输入,使得所有操作员正在研究从同一组上游输入派生的数据。

看着一些理论失败,让' s看他们在实践中实际发生的频率。

我测试了各种流行的流系统,但最终只有一些很少可以轻易表达跑步的例子。其他人缺乏对未展开的聚集体和联合的支持,或者在火花结构流媒体的情况下,对其使用有严重的限制。

差异数据流是用于编写一致数据流程的库。正如您从代码中看到的那样,它更像是用于实现流系统的一个工具包,而不是直接使用的东西。

它提供了具有各种关系运算符的时变桌子。它跟踪每个事件的时间戳,并通过时间戳同步连接。在系统的边缘需要水印,用于控制非单调运营商的排放。

对于测试,我从文件读取输入并将输出写入文件。读取输入时,我任意选择每批次使用1000个交易。我也使用最新输入后5秒的水印。

差分数据流集合的输出包含一流的插入物,删除和水印。

$ head-n 20差分数据流/原始结果/余额没有更新时间戳< 160945919500000000000080000000000000000000000000000000没有更新时间戳< 16094591960000000000008000000000没有更多的时间戳< 160945919700/160945919700000000000没有更新时间戳< 160945919800/160945919800/16000000没有更新时间戳< 160945919900/160945919900000000000没有更多的更新时间戳< 1609459200000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 1x(8,196)在1609459200000000000000000000000000000000000插入1x(5,-47)的插入1x(5,-47),160945920000000000000插入1x(7,-37),160945920000000000000插入1x(2,2,96),在160945920000000000000更新时间戳<删除1x(0,62)160945920100000000000插入1x(0,214),1609459201000000000插入1x(3,-66),160945920100000000000

一旦水印已通过给定的时间戳,就不会获得更多更新。这允许运营商在接收输入后立即工作,同时仍跟踪哪些输出是一致的。对于此测试,我呼叫整合在输出上,累积并合并冗余更新,直到水印通过它们,保证它只会发出一致的更新。

I' Ve还包括物以来的代码,它是基于差分数据流的顶部,并在相同的一致性模型下运行,但增加了一个SQL接口和一系列源和源以及处理与外部系统的交互的源,包括分配时间戳和水印。

物质化'当前源/宿系统是占位符,等待重新设计到正交的SQL运算符中。同时,唯一提供用于从任意输入提取事件时间的唯一来源是CDC协议,我懒得发射。因此,我使用了文件源并使用了实验时间过滤器功能进行了言论。

这意味着我可以' t使用水印等,与差分数据流不同,没有交易被拒绝迟到。除了实现的表现得完全像差分数据流,所以我在结果部分中单独覆盖它。

Kafka Streams正是听起来像什么样的媒体系统,内置了Kafka。 KSQLDB建于Kafka Streams之上,并为SQL的方言添加支持。

都提供事件流和时变桌子。可以使用关系连接,但仅提供流之间的最佳同步。最佳努力由许多配置变量控制,包括MAX.Task.IDLE.ms,它是一个挂载运算符将等待匹配输入的长时间的挂钟(!)超时。默认值为0 ms。我可以找到实际加入算法的最佳解释是在这个问题中。如果我理解正确,这意味着加入不是最终一致 - 您可以拥有网络打嗝,超时连接的一侧,然后切勿发出这些输出。

既不跟踪水印。作者争论了持续改进的模型。这使得不可能知道在输出上是安全的,因为当他们稍后添加了抑制运算符时,他们本身就是安全的。抑制操作员允许手动抑制非单调算子的早期排放。它和#39; s类似于差分数据流和#39; s巩固,但由于它缺乏水印,它可以' t保证输出一致,只能增加赔率。作为每次运算符,抑制还会将类似的失败模式暴露于早期部分中提到的那些关于水印中不一致的拒绝。似乎也有一种正确抑制非窗口外连接的方法,因为输出流不得不访问连接到连接的流次数。抑制操作员也尚未在Kafka Stream中提供。

不幸的是,我已经'它能够说服Kafka Streams为我们的运行示例产生任何输出(Kafka-12594)。人们正在生产中使用Kafka Streams,这显然是关于我的配置或环境必须不寻常的事情。但经过几个星期的努力,包括对汇合懈怠的对话,并在Kafka邮件列表中寻求帮助,我没有进展。

为了避免任何错误配置,我的kakfa流努力努力i' m通过confluent提供的docker图像使用ksqldb,其中包括zookeeper和kafka以及所有的默认配置。这是提供以下所有结果的原因。

输出是一系列键值对。如果有多个相同键的更新,则每个更新都替换了先前的值。已删除的键由具有空值的更新表示。

$ head-n 20 ksqldb /原始结果/余额createque:1609459200000 {"余额" :-47.0} CreateTime:1609459200000 {"余额" :-49.0} CreateTime:1609459200000 {"余额" :-48.0} CreateTime:1609459200000 {"余额" :-47.0} CreateTime:1609459200000 {"余额" :-58.0} CreateTime:1609459200000 {"余额" :-47.0} CreateTime:1609459200000 {"余额" :-51.0} CreateTime:1609459200000 {"余额" :-55.0} CreateTime:1609459200000 {"余额" :-59.0} CreateTime:1609459200000 {"余额" :-39.0} CreateTime:1609459200000 {"余额" :-47.0} CreateTime:1609459200000 {"余额" :-58.0} CreateTime:1609459200000 {"余额" :-55.0} CreateTime:1609459200000 {"余额" :-48.0} CreateTime:1609459200000 {"余额" :-49.0} CreateTime:1609459200000 {"余额" :-51.0} CreateTime:1609459200000 {"余额" :-59.0} CreateTime:1609459200000 {"余额" :-47.0} CreateTime:1609459200000 {"余额" :-39.0} CreateTime:1609459200000 {"余额" :-47.0}

当然,ksqldb的唯一选择是使用Kafka来源和汇。我将输入交易管制到Kafka-Console-Producer中,并使用Kafka-Console-Consumer读取汇。

Flink被分成两个API - DataStream API强烈关注高时的位置问题,因此CAN' T表达我们的运行示例(Edit请参见结束时),而表/ SQL API提供时变表与所有熟悉的关系运营。

像差分数据流轨道从系统边缘跟踪事件时间和水印,但与差分数据流不同,它不会为所有操作使用它们。对于大部分表API,情况似乎与Kafka Streams&#39相同;持续的细化模型。

文件源似乎以单个批处理加载文件的当前内容,然后忽略任何将来附加,所以它' s不可用于测试流行为。相反,我将Kafka用于两个来源和汇,具有与KSQLDB相同的设置。我将输出转换为流之前卸下,以确保捕获所有更新。我还在交易输入上设置了5S尾随水印。

$ Head-N 20 Flink-table /原始结果/余余插入7,16.0插入件6,10.0插入5,8.0插入4,6.0插入9,12.0删除7,16.0插入3,9.0插入7,17.0删除3,9.0删除5,8.0插入3,8.0插入5,9.0删除9,12.0删除4,6.0插入9,11.0插入4,7.0删除3,8.0删除9,11.0插入3,7.0插入9,12.0

对于每个系统,我立即将输入转储回磁盘以查看所取得的内容。

我设置了差异数据流以拒绝5S水印后的任何事务,这是大约1/3的数据集。 Flink也有一个5S尾随水印,但它并不拒绝非窗口计算中的任何输入。 当我尝试与Kafka Streams进行实验时,我有时发现accepted_transactions中丢失的行。 Hans-Peter Grahsl善意解释说这是因为: 当Kafka Streams写回Kafka时,它使用事件时间(如果设置为Kafka Log Timestamp)。 我的数据生成器使用了固定时间戳,到这一点是过去几个月。 除非明确指定的Kafka默认为7天保留期,否则将自由留给非确定性垃圾收集Accepted_Transactions主题。 此视图的输出不应包含空值,因为双方都具有相同的ID。

如果我们在Flink中尝试此操作,它会抱怨输入表包含必须使用的事件时间列。 有两种方法可以解决这个问题。 tenv。 executesql(串。加入(" \ n"" create diew suild_join_without_time(id,theor_id)作为","选择"," t1.id ,t2.ID作为其他","来自"(从事务中选择ID)为T1","左加入","( 从事务中选择ID)作为T2",""," t1.id = ......