将Apache光束管道部署到Google Cloud数据流

2020-10-27 01:47:03

最近,Posh的平台团队一直在构建一个分析仪表板,以帮助我们的业务用户可视化产品使用情况。Posh的主要产品是客户服务聊天机器人,因此我们的分析仪表板允许我们的业务用户分析我们的机器人为客户提供的服务有多好。作为这个分析仪表板的一部分,我们需要构建一个数据管道来实时处理和存储用户事件。

我们的数据管道是使用Apache Beam构建的,并在Google Cloud Dataflow上运行。如果您不熟悉这些工具,让我来解释一下!来自Beam主页:“Apache Beam是一个开放源码的统一模型,用于定义批处理和流数据-并行处理管道”。更详细地说,BEAM项目由一系列数据处理功能组成,称为“转换”。转换构成了批处理或流数据的数据管道-因此得名“光束”,批处理和流的混合体。管道是通过它们的一个开源SDK使用这些转换构建的;在我们的例子中,我们使用的是Python SDK。一旦建立了管道,它就由支持的分布式处理后端执行-数据流被用作我们的管道的后端。

在较高级别,我们的管道将用户事件作为输入,处理该事件,并将其存储在后端数据库中。客户事件通常由BOT交互组成,如开始聊天、提问等。然后,当我们的业务用户想要分析他们的BOT的性能时,他们可以通过我们的分析仪表板进行查询,分析仪表板将从我们的后端数据库返回相关数据。请注意,我们的聊天机器人不断输出数据,这意味着到我们数据管道的输入是无界的,并且我们的管道是流管道而不是批处理管道。

既然我们已经了解了基础知识,那么让我们来讨论一下部署。一旦我们编写了一个在本地工作的管道,就到了部署我们的管道并让它在云中运行的时候了。

数据流模板是在Google Cloud中打包和准备管道的一种方式。一旦准备就绪,就可以使用Google Cloud控制台、gcloud命令行工具或rest API调用来运行管道。与非模板化数据流部署相比,模板化部署具有许多优势。例如,能够在没有开发环境及其关联依赖项的情况下运行您的管道。这对于自动化部署非常有用,例如重复的批处理作业或从CI/CD管道触发的作业。

有两种不同类型的模板。对于Classic Templates,当管道运行时,Apache Beam SDK会创建一个JSON模板文件,并将其上传到Google Cloud Storage。该模板文件基于管道代码中的转换定义了一个执行图-一组数据处理步骤及其之间的路径(稍后将详细介绍)。相比之下,使用Flex模板,开发人员将其管道打包到Docker映像中,然后使用gcloud命令行工具构建模板规范文件并将其保存在项目的Container Registry中。只有在执行模板时才会创建执行图。

两者之间的一个关键区别是,因为Flex模板不会自动创建执行图,所以您可以在运行模板时对执行图进行小的更改。例如,如果要执行具有不同接收器文件格式的多个作业,则可以使用相同的基本模板文件,但在运行时修改执行图。请注意,虽然只有Flex模板允许更改执行图,但Classic和Flex模板都允许在执行时自定义运行时参数。

最终,我们将经典模板用于我们的管道。我们发现它们易于使用,并且不需要Flex模板提供的附加功能。

Python-m MODULE_NAME\-RUNNER DataflowRunner\-Region US-East1\-PROJECT PROJECT_ID\-STAGING_LOCATION GS://存储桶名称/STAGING\-TEMP_LOCATION GS://存储桶名称/TEMP\-TEMPLATE_LOCATION GS://存储桶名称/Templates/TEMPLATE_NAME。

此命令将运行您的管道模块,创建一个模板文件,并将其保存到Google Cloud Storage的TEMPLATE_LOCATION标志指定的位置。

STAGING_LOCATION标志指向DataFlow存放执行作业所需的代码包的云存储路径。TEMP_LOCATION标志是数据流的云存储路径,用于存放在管道执行期间创建的临时作业文件。请注意,临时位置包含执行模板所需的文件,因此即使在创建模板之后也不应删除这些文件!

如前所述,梁管道可以由许多不同的后端执行。当在本地运行我们的流水线时,使用Direct Runner在我们的本地机器上执行流水线。要创建数据流模板,使用的流道必须是数据流流道。

如果希望管道读入一组参数,可以使用Apache Beam SDK类来定义管道选项。在上面显示的模板创建命令中,传入的各种标志将被读入并解析为PipelineOptions,然后可以用来指示管道如何处理数据。要指定自定义选项,可以使用add_argument()方法,该方法的行为与Python的标准“argparse”方法相同,用于解析命令行选项。在下面的代码片段中,您将看到一个示例,说明如何指定您自己的自定义选项MyOptions,然后可以在您的管道内使用这些选项,在本例中指定管道应该从哪里读取输入。

将APACHE_BEAM作为BEAM从apache_beam.options.Pipelineoptions导入PipelineOptions类MyOptions(PipelineOptions):@classmethod def_add_argparse_args(cls,parser):parser.add_argument(';--input';)parser.add_arument(';--output';)#create the options object options=PipelineOptions()#指定预期的选项来自我们的自定义选项类my_options=options.view_as(MyOptions)#创建管道-使用beam.Pipeline(options=options)进行简单的读/写转换。Pipeline(options=options)as Pipeline:Lines=Pipeline|>;>;beam.io.ReadFromText(my_options.input)line|';write files';>;>;beam.io.WriteToText(my_options.output)行|';写文件';>;>;beam.io.WriteToText(my_options.output)。

通过命令行调用此模块时,命令中指定的标志将作为PipelineOptions读入。因此,例如,在前面显示的模板创建命令中,您可以指定一个输入选项,并且当在命令行中运行模块时,您可以为该选项赋值:--input=/path_to_input。

对于每个自定义选项,您还可以指定默认值和帮助文本。如果没有为选项赋值,将使用默认值,并且当用户将-help作为命令行参数传递时,将显示帮助文本。

如前所述,这些选项定义了指定管道行为方式的管道参数。管道可以接受常规参数和运行时参数。虽然常规参数可以在创建模板时指定,但运行时参数仅在管道执行期间可用。运行时参数在管道构造期间不可用,因此不能使用参数值更改管道的工作流图。如果您事先不知道值,并且只想在执行管道时分配一个,则应使用这些参数。

如果要利用运行时参数,可以在定义管道选项时使用ValueProvider类。为此,请使用ADD_VALUE_PROVIDER_ARGUMENT()替换add_argument(),如下所示:

需要注意的是,如果您不使用ValueProvider作为运行时参数,那么当您尝试在运行时提供这些参数时,Dataflow将不会使用这些参数。

好的,很好,我们有一个创建模板的命令,以及一个指定管道参数的方法。但是,如果我们有很多参数,并且我们想要验证它们是否被赋予了正确的值,该怎么办呢?这就是元数据文件的用武之地!元数据文件允许您在通过数据流控制台部署时指定参数要求。您可以使用布尔标志来指定参数是否为可选的,并使用正则表达式来验证参数值。

下面是我们的一个管道参数的示例规范。它使用正则表达式指定SESSION_EXPIRY标志必须是整数值,并且因为没有指定isOptional标志并且缺省为false,所以它声明该参数是必需的。

{";名称";:";SESSION_EXPIRY";,";help Text";:";,";LABEL";::";会话到期分钟";,";regexes";:[";^[0-9]+$";]}。

元数据文件应为JSON格式,并命名为<;template-name>;_METADATA,不带文件扩展名。元数据文件应该存储在与模板文件相同的Google Cloud Storage存储桶文件夹中。

在我们的模板创建脚本中,在我们运行创建模板的命令之后,我们立即运行一个脚本,该脚本上载与正在运行的管道相对应的元数据文件。

最后,让我们讨论一下依赖关系。在本地运行管道时,管道运行所需的包很容易获得,因为它们安装在本地计算机上。但是,当在云中运行时,您必须确保这些包在远程数据流VM上可用。为此,我们创建了一个setup.py文件,在构建DataFlow模板时,Beam管道使用该文件来打包依赖项。在运行创建模板文件的命令时,我们将此setup.py文件的路径作为setup_file标志进行传递。请查看本指南,了解分步指导。

执行模板文件的方式有很多种。通常,我们使用Google Cloud Console从Google Cloud Storage存储桶中选择模板文件,如下图所示。

根据前面创建的元数据文件中指定的条件,可以通过控制台输入管道参数。但是,如果您更喜欢从命令行启动作业的实用程序,则可以改用gcloud命令行工具。为了部署、构建我们的管道,我们在本地开发环境中运行gcloud dataflow job run<;job_name>;命令-类似于下面显示的示例命令。如果您不想花费时间在数据流控制台中手动输入参数,则在本地运行此命令可以使您的工作更轻松,因为您可以将该命令与所有参数一起保存。尤其是当您调试本地运行良好但在Dataflow中远程运行时失败的管道时,使用gcloud工具部署可以大大节省时间。

在我们的CI/CD管道中,我们有一个将更新的模板文件推送到Google Cloud Storage的手动步骤。如果我们更改了管道代码并准备部署新版本的管道,则手动触发此步骤。当触发此步骤时,我们的CI/CD管道运行模板创建脚本,该脚本创建并暂存一个模板文件。此CI/CD管道为更新模板文件建立了一个更受控制的过程,并确保用于构建模板文件的环境保持一致。

一旦管道启动并运行,我们将使用Google Cloud控制台监控工具检查其状态。您可以看到可视化的执行图,其中显示了流水线中的作业之间的数据流。对于每个作业,您可以查看进出的数据量、作业的工作时间以及其他有趣的指标。您还可以使用这些指标创建警报,以帮助检测和通知您潜在的管道故障。我们已经使用了一小部分可用的监控工具,所以如果您有兴趣了解更多信息,我会让您查看这份深入指南。

我们希望您喜欢这篇关于在数据流中部署光束管道的指南!如果我们遗漏了什么,或者您对如何更好地使用数据流有什么建议,请告诉我们。

如果你有兴趣在一家早期创业公司工作,创造对话式人工智能的未来,请申请加入我们的团队!