在Elixir中的WebSocket客户端踩踏

2021-06-10 21:23:07

对于最近的一个项目,我需要使用STOMP协议通过WebSocket与外部服务器通信。在搜索和未能找到可以执行此操作的Elixir中找到任何库后,我正在考虑倒回Python的Stomp.py并使用Erlport与该过程进行通信。虽然这绝对是一个可接受的解决方案,但它也是一种过度复杂的解决方案,特别是因为STOMP是一个非常简单的协议,并且在Elixir中已经有一些库可以解析并创建STOMP消息。

事实证明,它真的很容易推出我自己的WebSocket客户端实现,使用STOMP传达。每个STOMP消息都有一个简单的结构:

根据服务器和客户端之间的交互,标题和主体可以是任何内容。并且只有一些客户端命令踩踏:

有了这个,虽然肯定可以自己解析和创建邮件,但我建议使用现有的测试库,该库已经开箱即用。我用过大象,但你可以使用任何其他图书馆甚至推出自己的实现。

同样,为了在Elixir中创建WebSocket客户端,有几个流行的实现。 Websockex是最容易开始的,特别是如果你不知道erlang。但如果你真的需要一个完全战斗的图书馆,还有枪。我将假设使用WebSockex,这是我使用的,并且在博客中也会更容易遵循。启动Web套接字客户端非常简单:

defmodule myapp .websocketclient确实使用Websockex"""通过调试:如果要启用追踪""" def start_link(%{url} = state)do opts = if(映射.get(陈述),[[:trace]],[])Websockex .start_link(URL,__module__,状态,选择选择)结束结尾

Stomp客户端中的第一步是发送帧以连接到服务器。我们可以使用have_connect:

在成功的连接上,服务器向我们发送连接的响应。让我们首先使用Handle_frame设置客户端以接收和解析STOMP消息:

Websockex def handle_frame({:text,text},状态)do logger .debug("句柄框架 - #{to_string(text)}")案例大象.message .parse(text)do {:好的,%大象.message {} =消息,_more} - > ACK(消息,状态)handle_stomp_message(消息,状态){:错误,:无效} - > Logger .warn("收到的帧无效。忽略..."){:确定,状态} {:不完整,消息} - > Logger .warn("收到的不完整框架。到目前为止解析 - #{检查(消息)}"){:确定,状态}结束def handle_frame(任何,州)do logger .warn(&# 34;忽略未知帧 - #{检查(任何)}"){:确定,州}结束

非常自我解释,我们收到框架,解析正在使用大象并致电我们的自定义句柄_stop_message来处理此消息。我忽略了所有不完整和无效帧,但如果您愿意,您还可以更新此以终止WebSocket连接。

使用我们的Settup在Handle_Frame中,处理连接的代码非常简单。我们还将在订阅开放后立即订阅两个频道。您需要根据您的要求自定义此步骤。

defp handle_stomp_message(%大象.message {:connected},状态)做订阅(," / something")订阅(," / something / else"){:好的,状态}结束defp订阅(ID,目标),Websockex .cast(self(),{:send_message,%大象.message {:subscribe,[destination,id]}})

你问的websockex.cast是什么?它就像一个常规的genserver.ctor,它向我们的进程发送了一个异步消息,这可以决定将其处理它喜欢在handle_cast中。在这里,我们正在处理两种消息,如:send_messages和:close。

Websockex def handle_cast({send_message,%大象.message {} = message},状态),{:回复,帧(消息),send} def handle_cast(:close,state),{:close,state |>地图.put(:close,true)} defp帧(%大象.message {} =消息),{:text,大象.message .fersage .format(消息)}

来自服务器上的订阅的消息将通过消息帧接收。处理它将非常具体于每个应用程序,因此这里是一个非常简单的通用块,假设正文是JSON编码并调用自定义句柄_subscription_message来处理该消息。我将把Handle_subscription_message的实现留给您。

defp handle_stomp_message(%大象.message {:message,body} =消息,状态)do case大象.message .get_header(消息,"订阅")do {:好的,} - > handle_subscription_message(:something,message,jason .decode!(body),状态){:好的,} - > handle_subscription_message(:something_else,message,jason .decode!(body),状态)其他 - > logger .warn("未处订阅#{other} - #{inspect(message)}"){:确定,状态}结束结束

要回复任何帧,我们所需要的只是返回{:回复,帧,状态}从句柄方法或使用websockex.cast与{send_message,frame}来达到我们的自定义Handle_cast实现以发送消息到服务器。

您可能已经注意到Handle_Frame中的ACK。这只是为了通知服务器我们收到了一条消息。以下是实现:

defp ack(%大象.message {} =消息,状态)do case eLephant .message .get_header(消息," messageId")do {:好的,id} - > WebSockex .cast(self(),{:send_message,message_send(%{" accknowledge",id},state)})_else - > {:错误,:id_not_found}结束ofd peffp mess_send(%{destination,id,body},do%大象.message {:send,[destination,id],if(body,jason .endode!(body), "")}结束

关于此事项的重要事项是,根据您的服务器实现,确认目标和MessageId标题字段可能会有所不同。

最后,一旦完成连接,您需要将断开框架发送到服务器以通知您打算关闭。基于STOMP协议,在发送断开连接后,您还应等待在实际关闭连接之前从服务器接收断开连接。

#disconnectwebsockexx .cast(self(),{:send_message,%大象.message {:disconnect,[]})#句柄收据defp handle_stomp_message(%大象.message {:procept} =消息,状态)do id = to_string( )案例大象.message .get_header(消息," precess-id")do {:确定,^ id} - > {:关闭,国家|>地图.put(:close,true)}其他 - > Logger .warn("无效的收据消息 - #{其他} - #{检查(消息)}"){:确定,状态}结束结束

{:close,state}的回复从上面的实现是足以启动套接字的关闭。我们还为状态内的关闭键放置了一个真实的值,以标记我们已启动套接字的关闭。如果您想陷入困境并根据这是正常的或异常关闭,则需要这是必需的。

websockex def iteminate(_reason,%{true}),退出(:normal)def终止(原因,状态)do#如果需要捕获任何原因logger .debug("套接字终止: - #{检查原因} - #{检查状态}")退出(:正常)结束

鉴于这是WebSocket客户端的非常自定义的实现,很容易按照此操作以创建自己的Stomp客户端。