由Materialise支持的简单高效的实时应用程序

2021-01-21 03:03:14

在Web开发社区中,已经有明显的理由转向实现增量视图维护的框架,这是有充分理由的。逐步更新状态时,应用程序性能会更好,所需资源更少。使用Materialize,开发人员和数据分析人员可以利用现有的SQL知识,在数据处理管道中采用相同的事件驱动技术。在此博客文章中,我们将构建一个应用程序,以演示开发人员如何在Materialize的支持下为用户创建实时的,事件驱动的体验。

注意:这篇文章是我将Streaming TAIL编写到浏览器时实现的目标的实现。该帖子不是理解该帖子所必需的,但建议您熟悉TAIL命令。

在本文中,我们将使用文档入门指南中的演示作为构建最小的Web应用程序的基础。该应用程序允许用户查看对Wikipedia的编辑总数,以及排名前10位的编辑者的条形图。这是显示最终结果的动画:

想要自己运行此演示吗?太棒了!克隆我们的存储库,并按照说明自行运行。

在介绍如何构建它之前,让我们概述一下解决方案所需的属性。最终解决方案应该是:

基于推式–应由服务器启动更新,而不是让客户端轮询更新,仅当客户端状态已过时。

完整–无论何时加载应用程序,客户端均应提供一致且完整的数据视图。

无缓冲–对基础数据集进行更改后,客户端应立即收到更新,而不会在数据管道中造成任何延迟。

经济–更新的大小应取决于当前状态和期望状态之间的差异。此外,不应要求客户端从历史记录开始重播状态。

关注点分离–修改源数据的应用程序(写程序)无需了解使用数据的应用程序(读程序)的任何知识。

尽管其他应用程序有可能满足其中的一些属性,但我希望该应用程序将演示为什么提出的解决方案非常适合这种情况。关于此应用程序为何具有上述特性以及为什么其他解决方案可能无法满足上述要求的讨论,将在本文后面进一步讨论。

对于那些不熟悉我们的入门演示的人,以下是我们管道中的数据流:

查看下面的系统图,我们看到整个数据管道都包含在单个实例化实例中:

注意:如果启动了应用程序,则可以运行mzcompose ps来查看已启动的容器。

这个容器运行curl,以将Wikimedia的最近更改日志流式传输到与我们实例化实例共享的Docker卷中的名为lastestchanges的文件。

这个容器运行实例化实例,配置为尾部最近更改文件并维护我们的两个实例化视图:counter和top10。此实例中的视图的配置与《入门指南-创建实时流》中所述完全相同。

该容器运行一个Python Web服务器,该服务器承载我们JavaScript应用程序的代码,并将行的结果从TAIL转换为应用程序期望的批处理结果。

我们的示例应用程序是使用两个库(Tornado和psycopg3)构建的异步Python Web服务器。我想指出应用程序的三个组件:

使用TAIL命令订阅实例化视图更新的Python代码,并将面向行的结果转换为批处理。

注意:Materialize使用单词Batch来表示表示先前状态与所需状态之间的差异的数据结构。您可以将批处理视为“数据差异”。

为了有效地更新我们的客户端视图状态,我们希望通过websocket向所有配置的侦听器呈现一批处理流。我们将批次定义如下:

batch = {" insertted&#34 ;: [//要添加到我们的数据集中的行数组]," deleted&#34 ;: [//要从我们的数据集中删除的行数组],&#34 ; timestamp&#34 ;: 0 //实现定义的时间戳记}

当客户第一次连接时,我们将从历史记录的开始发送所有批次的压缩视图:

def add_listener(self,conn):"""将此连接插入到将在新消息中得到通知的列表中。"""#最新的查看状态,以使此侦听器追上到世界的当前状态。write_message({" deleted&#34 ;: []," inserted&#34 ;: self.current_rows," timestamp&# 34 ;: self.current_timestamp,})self.listeners.add(conn)#订阅批处理流

侦听器如何使用批处理对象引导其状态?如果我发现有一个特别适合该解决方案的属性,那么将第一批应用于空列表的属性作为我们的初始状态。这意味着初始化和更新是相同的操作。这个批处理对象非常有用,以至于D3的更新和Vega的change API希望更新以类似的形式出现。

但是,尾部的结果是面向行的。我们需要一些代码才能将行从行转换为批处理。这是所需转换的示例:

#我们需要转换此行流... 1608081358001 f -1 [' Lockal&#39 ;,' 4590'] 1608081358001 f 1 [&Epidosis&#39 ;,&# 39; 4595'] 1608081358001 f -1 [&Matlin&#39 ;,' 5220'] 1608081358001 f 1 [' Matlin&#39 ;,' 5221&#39 ;] 1608081359001 t \ N [' \\ N&#39 ;,' \\ N']#到此数据结构。timestamp = 1608081358001inserted = [(' Epidosis&# 39;,' 4595'),(' Matlin',' 5221')]已删除= [[' Lockal',' 4590'),(&#39Matlin',' 5220')]

让我们看一下用于订阅以查看更新并将行转换为批处理的代码。

要处理TAIL中的行,我们必须首先声明一个游标对象,该对象将用于无限期地遍历行。为了帮助我们的代码知道何时发布更新,我们在响应中要求提供进度标记:

async def tail_view(self):"""生成一个协程,该协程设置一个协程以处理来自TAIL的更改。"""与await self异步。 mzql_connection()as conn:async with await conn.cursor()ascursor:query = f" DECLARE cur CURSOR FOR TAIL {self.view_name} WITH(PROGRESS)" await cursor.execute(query)await self .tail_view_inner(光标)

现在,我们已经创建了一个交流渠道,可用于等待尾部查询的结果。每当我们的视图改变时,我们的应用程序将立即收到通知,我们可以从游标对象中读取行。 tail_view_inner实现了处理行并将其转换为批处理的逻辑:

异步def tail_view_inner(自己,光标):"""从TAIL中读取行,将其转换为更新并进行广播。""" inserted = []删除= []时为True:#阻塞,直到有新结果(未缓冲FETCH)等待游标。在游标中对(时间戳,进度,差异,*列)执行异步(f" FETCH ALL cur") :#progressed列用作同步原语,指示已读取所有#个更新行。我们应该发布此更新。如果progress:self.update(deleted,insert,timestamp)inserted = [] deleted = [] continue#通过创建" diff"简化我们的实现。如果diff< diff< 0:deleted.extend([columns] * abs(diff))elif diff> 0:inserted.extend([columns] * diff)else:raise ValueError(f"来自TAIL的错误数据:{row}")

现在我们有了一个批处理对象,我们将此更改应用于我们自己的内部VIEW并将更改广播给所有侦听器:

def update(self,delete,insert,timestamp):"""基于此差异更新我们的内部视图。""" self.current_timestamp =时间戳#删除已删除的r中已删除的所有行:self.current_rows.remove(r)#并添加已插入的任何行self.current_rows.extend(inserted)#如果配置了侦听器,则广播此diffif self.listeners :payload = {" deleted&#34 ;:已删除,"插入&#34 ;:已插入," timestamp":timestamp} self.broadcast(有效载荷)

设计决策:有经验的读者会注意到,通过在Python Web服务器中维护视图的内部副本,我们可以减少与实现实例的连接数。这是我在编写此代码时做出的严格可选的设计决策-与其他数据库相比,物化连接的重量非常轻。我们希望在某些用例中,您希望每个用户一个或多个实现连接。例如,考虑为每个用户个性化的仪表板提供临时的物化视图。

对于此应用程序,我选择减少连接是出于习惯而不是必要。例如,如果我们想为数百万的客户提供服务,它的确也可以实现更大程度的扇出。

现在,我们已经看过广播更新的代码,下面让我们展示我们的JavaScript代码如何使用这些批处理。我们的应用程序显示了两件事:总编辑计数器和前10个图表:

Total Edits Counter仅关心计数器视图中的最新值,该视图本身仅由一行组成。这意味着我们可以实现一个用于总计数的WebSocket侦听器,该侦听器仅从插入的行中读取第一行,并使用该行来更新我们的计数器HTML元素:

var path =" ws://" + location.host +" {{reverse_url(' api / stream&#39 ;,' counter')}}&#34 ;; var connection = new WebSocket(path); connection .onmessage = function(event){var data = JSON.parse(event.data); //计数器是一个单行表,因此每次更新应包含一个插入和/或一个删除(我们不这样做)关心)document.getElementById(" counter")。innerHTML = data.inserted [0] [0]}

前10个图表使用Vega-Lite绘制条形图。因为我们的批处理数据结构直接映射到vega.change方法,所以我们可以在Vegalite示例中遵循其流数据。我们确实需要编写少量代码来启用属性查找:

vegaEmbed(' + view_name,chart,config)。然后(function(chart){var path =" ws://" + location.host +" {{reverse_url(' api / stream&#39 ;,'')}}" + view_name; var connection = new WebSocket(path); function convert_to_subject(row){return { subject:row [0],count:parseInt(row [1])};}函数subject_in_array(e,arr){返回arr.find(i => i.subject === e.subject& i .count === e.count);} connection.onmessage =函数(事件){var data = JSON.parse(event.data); var insert_values = data.inserted.map(convert_to_subject); var delete_values = data.deleted .map(convert_to_subject); var changeSet = vega.changeset()。insert(insert_values).remove(d => subject_in_array(d,delete_values))); chart.view.change(' data&#39 ;, changeSet ).resize()。run();}});

就是这样!当收到新批次时,Vega / Vega-Lite仅更新已更改的元素并重绘我们的图表。现在,我们有了由实例化视图提供支持的实时图表。

在本节中,我们看到了如何构建真正的端到端,事件驱动的管道,以最大程度地减少构建实时用户体验所需的工作量。同步客户端状态的代码简单明了。轮询不会带来不必要的延迟,并且更新可以高效发送和处理。现在,让我们重新访问所需的属性,看看我们的工作方式,并将其与其他潜在解决方案进行比较。

从上面的示例代码中,我们可以看到我们的应用程序符合所需的属性:

基于推式–我们的Python服务器和Javascript应用程序会在可用时立即接收批处理,并通过现有连接发送。由于物化仅在视图已更改时才产生批次,因此仅在必须更改客户端状态时才触发更新。

完整–无论何时启动,Python服务器都会始终显示数据的完整视图。同样,无论何时连接,我们的Javascript客户端始终具有完整的数据视图。

无缓冲–一旦事件数据写入源文件,Materialize就会计算批处理更新。

经济–批量大小与先前状态和新状态之间的差异成比例。这既减少了通过网络发送的数据量,又减少了处理每个更新所需的工作量。客户端首次连接时,不需要从所有历史记录中重播状态。相反,客户端会收到一个已经压缩的当前状态视图。

关注点分离–编写数据,卷曲的应用程序对物化视图或我们的JavaScript应用程序一无所知。不管我们添加其他视图,将wikirecent流与另一个数据源一起添加,甚至更改现有查询都无所谓-无需修改我们的编写器。

人们长期以来一直在构建实时应用程序,而Materialize使构建这些应用程序变得简单而不受传统限制。其他解决方案的常见缺点包括:

如果没有增量视图更新,则应用程序必须不断查询数据库以获取查询结果。这将导致数据库上的负载和网络流量增加,因为必须每次都计算查询的全部结果。这也导致Web服务器上的负载增加,因为它们必须处理每个查询响应上的完整结果集。

如果没有在SQL中定义增量维护的视图,则每个实例化视图都将需要编写自定义流处理功能以及创建中间源和接收器。添加微服务将导致操作开销增加和部署复杂性。

当缓冲批次更新时(例如在ELT / ETL管道中),应用程序将在旧状态下运行。虽然很容易想到单个视图只有5分钟,但管道中的累积延迟可能会更糟。处理延迟导致应用程序呈现不完整和/或不一致的状态,尤其是在跨多个源联接数据时。这会降低客户对数据管道的信任。

没有增量视图更新,应用程序必须实现自己的逻辑以通过比较客户端和服务器状态来计算批次。这会导致逻辑重复,在这种情况下,您有一个用于初始更新的实现,而另一个有增量更新的实现。它还介绍了客户端断开或关闭连接后重新连接的情况。在应用程序中实现状态同步逻辑会带来额外的复杂性。

如果没有服务器告知客户端哪些数据已过时,则寿命长的客户端将被迫实施自己的逻辑以删除旧数据。尽管这可能仅适用于追加数据集,但大多数数据集都具有插入和删除功能。即使源是仅追加的,下游视图也可能不是仅追加的,例如top K查询。强迫客户端复制服务器的逻辑以删除数据会导致实施过程中的额外复杂性,并使向数据管道推出更新更加困难。

当数据库无法产生增量更新时,编写者可以直接通知侦听器基础数据已更改。这通常是使用诸如LISTEN / NOTIFY之类的辅助通道完成的,但是它也有其自身的缺点。写者实现产生增量更新所需的逻辑,或者读者必须在每个通知上获取整个数据集。另外,在存在数据流的情况下,即使是简单的数据流(例如我们的示例应用程序),确定通知谁也是一项艰巨的任务。

如果没有用于处理所有摄取数据的连接,大多数流处理系统将根据窗口的大小或数据的使用期限使数据过期。 在其他框架中,数据过期后,您将无法再加入该框架。 临时联接使得很难信任数据管道中的结果。 物化使编写实时,事件驱动的应用程序变得简单而直接。 这篇博客文章提供了一个示例应用程序,演示了如何使用TAIL语句构建实时的,数据驱动的应用程序。 我们的应用程序在实时应用程序中保持了一些理想的属性,同时避免了其他方法中存在的常见限制。 立即查看Materialize! 不同意我说的话? 还有另一种执行相同任务的方法吗? 我很乐意在我们的社区或直接与您联系!