新的Apache Kafka到AWS S3连接器

2020-11-21 19:40:49

一段时间以来,社区中的许多人一直在要求我们开发新的Kafka至S3连接器。因此,我们很高兴地宣布它现已可用。

与现有的S3连接器相比,它具有许多优势:

与我们的其他Stream Reactors一样,该连接器扩展了标准的connect配置,并为SQL命令(Lenses Kafka Connect查询语言或“ KCQL”)添加了一个参数。这定义了如何将数据从源(在本例中为Kafka)映射到目标(S3)。

重要的是,它还包括如何将数据划分为S3,存储桶名称和序列化格式(支持包括JSON,Avro,Parquet,Text,CSV和二进制)。

连接器支持对S3存储桶的多种不同身份验证机制,包括标准AWS凭证(使用访问和密钥)以及通过EC2的IAM角色。

在需要凭证的地方,可以使用Lenses秘密管理器插件与常见的密钥管理解决方案(例如Hashicorp Vault和AWS Secret Manager)集成。

控制数据在接收时的结构方式,可以让您针对特定的用例,工作负载和下游应用程序(例如Snowflake或AWS Athena,Glue等)的读取模式进行优化,并降低成本。而且,您避免在发送到S3之前必须构建和部署复杂的流处理工作负载。

像所有Stream Reactor和Kafka Connect连接器一样,该连接器可以通过传统的部署工具和框架进行管理,也可以通过Lenses进行管理。这为如何管理Kafka连接器增加了一层RBAC,治理,审计,错误处理和监视。

定义为Connect configuration&SQL的整个工作负载可帮助您通过配置和通过GitOps管理数据管道。

假设存在一个场景,其中我们在Kafka主题backblaze_smart中具有与某些产品的制造过程失败相关的记录,并且事件需要存储在S3存储桶中。

由不同数据团队针对每种模型运行的下游分析解决方案(例如AWS Athena)需要访问此数据。

出于读取性能和成本的原因,数据团队要求将与每个模型相关的消息按特定的命名约定在S3存储桶中划分为不同的对象,并放置在manufacturing_failures文件夹中。

connect.s3.kcql =插入INTO制造:manufacturing_failures SELECT序列号,型号,容量字节,失败FROM backblaze_smartPARTITIONBY模型STOREAS`Json`WITH_FLUSH_COUNT = 300

Lenses.io Box是一个免费的多合一Kafka + Lenses开发泊坞窗。这是用于开发实时应用程序和测试连接器的理想环境,并且附带示例数据。

如果您喜欢使用自己的Lenses或Kafka Connect环境,则可以从此处下载连接器,并从lens.io/start请求免费试用Lenses。

我们将创建一个IAM角色,以配置EC2身份验证以访问S3存储桶。如果您不在EC2实例上运行Lenses Box,则不需要这样做,但是您需要提供一个Access and Secret键来写入S3。

3.在存储桶名称中,输入存储桶的名称。请勿使用连字符或其他特殊字符。这是以后连接器的要求。

记下您的存储桶的AWS区域,稍后将需要它。

为了使EC2身份验证起作用,必须先创建IAM角色,然后才能启动将运行Kafka Connect的实例(Lenses Box泊坞窗与Kafka Connect打包在一起)。

创建IAM角色后,它将被附加到运行Kafka Connect / Box的实例上。

或者,如果您将改为使用AWS Secret and Access密钥对S3存储桶进行身份验证,请按照此处描述的过程进行操作。

3.在“选择角色类型”页面上,选择EC2和EC2用例。选择“下一步:权限”。

5.打开JSON编辑器并粘贴以下策略,将存储桶名称替换为您的存储桶名称。

此策略允许访问S3存储桶以及以后需要的Kafka功能。

6.在Review页面上,为策略输入名称S3DataAccessPolicy,然后选择Create policy。

7.返回到用于创建角色的上一个浏览器选项卡,并刷新策略列表,然后查找并检查新创建的策略。

如果您要在运行docker的EC2实例上进行部署,建议您在t2.large实例上运行。

为了使EC2身份验证有效,您需要确保将与正在运行的实例相关联的IAM角色分配给刚刚创建的S3DataAccessRole。

需要在安全组中配置端口3030才能访问镜头。根据您要访问的其他服务,您可能需要打开某些端口(但对于本演练不是必需的)。

1.从以下网址免费索取Box许可证密钥:https://lenses.io/box/。您会收到一封包含docker run命令的电子邮件

注意:如果要为此实例配置自己的外部生产者/消费者,请相应地设置ADV_HOST地址。但是,您还需要确保在安全组中打开端口9092。

2.“浏览”页面充当实时数据目录。搜索backblaze_smart主题。本主题包括Box环境中来自样品生产者的实时流量。

3.为了更准确地确定要插入S3的字段,请在SQL Studio中运行以下语句

1.确保使用AWS CLI从环境中访问S3存储桶通常是一个好主意。

2.如果成功访问存储桶,请从Lenses中,从顶层菜单中选择“连接器”。 Box中应该已经配置了许多示例连接器。

3.单击新建连接器。镜头将列出默认情况下包装在包装盒中的所有可用连接器。

5.输入连接器的详细信息。注意在参数aws.custom.endpoint&aws.region中更新S3存储桶的区域。确保使用先前在connect.s3.kcql参数中创建的存储桶来更新存储桶名称MyBucketName。

connect.s3.kcql = INSERT INTO MyBucketName:manufacturing_failures SELECT序列号,模型,容量字节,失败FROM backblaze_smartPARTITIONBY模型STOREAS`Json`WITH_FLUSH_COUNT = 3

如果您使用凭据而不是EC2进行身份验证,请输入密钥和访问密钥。

如果成功,请准备免费使用Lens.io/start上提供的Lenses试用版,将连接器部署到自己的Kafka Connect&Kafka环境中。