自制的ECTO和灵丹妙药分析

2020-07-17 03:13:04

对于Dashbit网站,我们希望尽可能避免跟踪用户。这意味着没有cookie,不幸的是,大多数分析都使用cookie进行跟踪和/或指纹识别。但是,我们仍然希望看到我们网站上的哪些页面经常被访问。为此,我们决定推出我们自己的分析系统。

在本文中,我们将介绍如何使用Ecto upserts实现分析系统,以及如何使用Elixir注册表和Elixir进程来减轻数据库的压力。

想法非常简单:每次有人访问页面时,我们都会将此信息存储在数据库中。但是,我们不需要在访问发生的瞬间跟踪每个访问。对我们来说,跟踪一个页面一天的访问量是完全可以的。因此,每次在给定日期访问页面时,我们都会尝试在数据库中插入一个条目。如果条目已经存在,我们将改为更新其计数器。

幸运的是,这可以通过ECTO中的向上插入来完成。让我们首先定义数据库资源的架构:

defmodule MyApp.Metrics.Metric使用Ecto.Schema@primary_key false schema";metrics";do field:date,:date,primary_key:true field:path,:string,primary_key:true field:Counter,:Integer,Default:0 End End。

它有三个字段:日期、页面路径和计数器(访问次数)。日期和路径构成复合主键。我们的迁移如下所示:

defmodule Dashbit.Repo.Migrations.CreateMetrics确实使用Ecto.Migration def change do create table(:Metrics,PRIMARY_KEY:FALSE)do add:date,:date,primary_key:true add:path,:string,primary_key:true add:Counter,:Integer,Default:0 End End End。

defp upsert!(路径,计数器)是否导入Ecto.Query Date=Date。UTC_TODAY()QUERY=FROM(m in Dashbit.Metrics.Metric,UPDATE:[INC:[COUNTER:^COUNTER]])Dashbit.Repo.。插入!(%Dashbit.Metrics.Metric{Date:Date,Path:Path,Counter:Counter},On_Confliction:Query,Conflicts_TARGET:[:Date,:Path])End

上面的代码执行向上插入,将页面中的访问次数递增COUNTER的值,该值通常为1。如果条目不存在,则立即创建一个条目。

这是我们分析的核心。这是一个非常直截了当的解决方案,但它对接受我们所有写入的数据库有很强的要求。虽然大多数应用程序严重依赖数据库,但分析系统是我们网站中唯一使用数据库的地方,因此我们认为,即使在与存储层交谈时出现错误,显示一篇文章(如这篇博客文章)也很重要。为了解决这个问题,我们决定将上方插入移动到单独的进程。

如上一节所述,我们希望将分析代码完成的所有数据库写入移到单独的进程中。到目前为止,我们对我们的解决方案的另一个担忧是它将如何处理过载。如果有一个巨大的流量高峰,我们会不会最终给数据库带来太大的压力?从这个意义上说,批量写作是个好主意吗?

老实说,我们的应用程序使用SPAKS就可以了。多亏了Phoenix,我们的大多数页面加载都在数百微秒内,而且我们的数据库使用率极低。另一方面,这样一个小项目是一个完美的实验机会,所以我们决定探索一下,如果我们以异步批处理方式执行写入,我们的分析解决方案会是什么样子。

这是我们想出来的。每次用户访问页面时,我们都会生成一个药剂进程,跟踪对该页面的所有访问。如果所述页面已经存在进程,我们将改为向现有进程发送消息。此过程的目标是收集内部时间内的所有访问,在X秒后写入数据库。

我们为流程定义一个模块,并将其声明为GenServer。我们也说这个过程是:暂时的。也就是说,如果它死了,我们不希望主管重新启动它。这是因为我们假设,如果进程死亡,我们为每个页面动态生成进程的逻辑最终会启动一个新的进程。

init回调陷阱退出并将进程状态设置为{path,0}。第一个元素是页面路径,第二个元素是页面访问量。

我们的流程应该能够接收到:Bump消息。每当我们需要撞击计数器时,都会发送此消息,并由HANDLE_INFO回调处理:

@impl true def handle_info(:bump,{path,0})do Schedule_upsert(){:noreply,{path,1}}end@impl true def handle_info(:bump,{path,count})do{:noreply,{path,count+1}}end。

如果我们在页面没有访问权时收到:bump(即计数器为零),我们会将计数器增加到1,并且我们还会安排一个upsert事件,因此我们最终会将这些访问写入数据库。如果计数器大于0,我们只需调整它并返回更新后的状态。

defp Schedule_upsert()进行处理。Send_After(self(),:upsert,Enum。随机(10.。。20)*1_000)end@impl true def HANDLE_INFO(:upsert,{path,count})do upsert!(path,count){:无回复,{path,0}}end defp upsert!(path,count)do#功能与上一节end相同。

函数的作用是:将一条消息调度到当前进程(self())。消息将被命名为:upsert,它将以10s到20s之间的随机值传递。我们选择随机值的原因是为了避免同时产生不同页面的多个进程,并且它们都同时写入数据库的情况。

接下来,我们定义另一个HANDLE_INFO子句,这次用来处理Scheduled:upsert消息。这个子句只调用upsert!函数,并将状态重置回{path,0}。这使得,一旦有新的凸起,我们将安排新的上升。

最后,我们实现Terminate回调,该回调将在应用程序关闭时调用:

@impl true def Terminate(_,{_path,0}),do::OK def Terminate(_,{path,count}),do:upsert!(path,count)end。

如果我们的应用程序正在关闭,我们的Worker中可能有挂起的写入,因此我们希望将它们作为终止逻辑的一部分发送到数据库。需要记住的一件重要事情是,除非您捕获出口,否则默认情况下关机时不会调用Terminate回调。这就是我们在init函数中调用Process.flag(:trap_exit,true)的原因。

我们刚刚实现的流程提供了我们到目前为止拥有的所有需求:写入现在是异步的,因为它们发生在单独的进程中,并且它们也是批处理的,使用介于10s和20s之间的间隔。我们需要实现的最后一步是在用户浏览网站时实际动态地产生这些进程。

为了生成和查找每个页面的进程,我们将使用Elixir的注册表。我们还需要一个动态监控器,它将成为所有工作进程的父进程。让我们在溢出度量模块中实现此逻辑,同时实现我们的凹凸(页面)函数。

我们的Dashbit.Metrics模块是一个Supervisor,它将有两个子级:注册表和所有工作者的Supervisor。由于工作进程是动态启动的,因此当请求到来时,我们将使用DynamicSupervisor。为方便起见,我们将工作进程、注册表进程和动态监管进程的名称存储在模块属性中。

定义START_LINK(_Opts)执行Supervisor。start_link(__module__,:OK,name:__module__)end@impl true def init(:OK)do Children=[{Registry,Key::Unique,Name:@Registry},{DynamicSupervisor,Name:@Supervisor,Strategy::One_for_One}]Supervisor。初始化(子项,策略::One_for_All)结束。

当IS_BINARY(PATH)DO PID=案例注册表时,定义Bump(PATH)。Lookup(@Registry,path)do[{pid,_}]->;pid[]->;case DynamicSupervisor。START_CHILD(@Supervisor,{@Worker,Path})do{:OK,PID}->;PID{:错误,{:已启动,PID}}->;PID End End发送(PID,:Bump)End End。

bump函数在注册表中查找给定路径是否有进程,并返回其进程标识符(PID)。如果不存在,我们要求工人主管动态启动一个工人。我们预计start_Child会产生两种可能的结果:

我们需要第二个分支来解决潜在的争用情况,在这种情况下,首次访问页面的两个用户将多次尝试生成相同的Worker。一旦我们找到PID,我们就向它发送:Bump消息。

我们快到了。只剩下两步了。首先,我们需要将Worker配置为在启动时进行自我注册。这是通过START_LINK函数完成的。让我们返回到Worker并添加以下内容:

现在我们只需要启动Dashbit.Metrics监管树。这通常在您的应用程序监管树中完成,通常位于“lib/my_app/application ation.ex”中:

就是这样。现在,每当用户访问页面时,我们只需要调用Dashbit.Metrics.bump(Path),其中path是当前页面地址。在我们的示例中,我们只存储路径,没有主机,也没有查询字符串)。如果您使用的是Plug,则可以从connec.path_info字段构建它。我们还仅在成功呈现状态为200的页面时执行写入。总体而言,我们的颠簸代码如下所示:

Plug:Bump_Metric defp Bump_Metric(conn,_opts)do register_Beast_send(conn,fn conn->;if conn.。状态==200执行路径=";/";<;>;枚举。加入(连接。PATH_INFO,";/";)Dashbit.Metrics。凹凸(路径)结束(Conn End)结束。

在本文中,我们介绍了一个最小的分析系统,它使用ECTO、GenServer和Elixir的注册表,可以异步和批处理地执行写入。使用注册表动态生成映射到不同资源(每个资源都有自己的生命周期)的进程,可以在许多不同的场景中使用。

我们的解决方案中的一个重要方面是,在为页面创建进程之后,它将保持活动状态,直到有新的部署。这对我们是有效的,因为我们只有不到100页,所以我们知道最大进程数必然是一个非常低的值。

尽管由于Erlang VM,Elixir进程是轻量级的,但是如果我们有大量的页面,比如数百万个页面,最终可能会有数十万个未使用的进程。在这种情况下,我们会稍微更改我们的解决方案,以便在每次插入后终止该进程。大致是这样的:

@impl true def HANDLE_INFO(:upsert,{path,count})执行#我们首先取消注册,以便停止接收新的#消息。然后,我们计划在处理完所有#条挂起的消息后停止,以最终在终止时进行插入。注册表。取消注册(@Registry,path)send(self(),:stop){:noreply,{path,counter}}end@impl true def handle_info(:stop,{path,count}){:stop,:shutdown,{path,count}}end。

就是这样,我们希望您喜欢这篇文章,并学到了一两件可能对您的下一个项目有用的东西!