将数据从Postgres流式传输到Apache Kafka

2020-09-22 23:51:43

从数据库获取数据到Apache Kafka肯定是Kafka Connect最流行的用例之一。Kafka Connect提供了可扩展且可靠的方式将数据移入和移出Kafka。因为它将插件用于连接器的特定插件,并且它只通过配置运行(不需要编写代码),所以它是一个简单的集成点。

我们可以使用以下docker-compose文件启动并运行带有单个代理的Kafka集群。

版本:';2';服务:ZooKeeper:image:confluentinc/cp-zooKeeper:最新主机名:ZooKeeper容器名称:ZooKeeper端口:-";2181:2181";环境:ZooKeeper_Client_Port:2181 zooKeeper_Tick_Time:2000kafka:image:confluentinc/cp-kafka:最新主机名:kafka容器名称:kafka依赖_on:-zooKeeper端口:-";9092:9092";-";29092:29092";环境:KAFKA_BROKER_ID:1KAFKA_ZOOKEVEN_CONNECT:动物园保管员:2181KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:明文:明文,明文_HOST:明文KAFKA_ADVERSED_LISTENERS:明文://KAFKA:9092,明文_HOST://LOCALHOST:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:1。

如果您希望使用用户界面而不是控制台工具来管理Kafka,Confluent Control Center是最佳选择之一。这是商业工具,但有30天的许可证。还有Landoop UI,也有Kafka Connect管理界面。如果您想使用汇流控制中心,您可以将其添加为服务docker-compose文件,如下所示:

控制中心:图像:confluentinc/cp-enterprise-control-center:5.5.1主机名:控制中心容器名称:控制中心依赖于:-ZooKeeper-Kafka端口:-";9021:9021";环境:Control_Center_Bootstrap_Servers:';Kafka:9092';Control_Center_ZooKeeper_CONNECT:';ZooKeeper:2181';CONTROL_CENTER_REPLICATION_FACTOR:1 CONTROL_CENTER_INTERNAL_TOPERS_PARTITIONS:1 CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS:1合流指标_主题_复制:1端口:9021。

从Confluent Hub下载Kafka Connect JDBC插件,并将zip文件解压到Kafka Connect的插件路径。当我们启动Kafka Connector时,我们可以指定将用于访问插件库的插件路径。例如plugin.path=/usr/local/share/kafka/plugins。有关详细信息,请查看手动安装连接器文档。

我们还需要JDBC 4.0驱动程序,因为连接器将使用它与数据库通信。*PostgreSQL和SQLite驱动程序已经随JDBC连接器插件一起提供。如果您希望连接到另一个数据库系统,请将驱动程序添加到与kafka-connect-jdbc jar文件相同的文件夹中。请参见安装JDBC驱动程序手册。

我们可以使用位于Kafka bin目录内的connect-Distributed.sh脚本运行Kafka Connect。在运行此脚本以配置Worker属性时,我们需要提供属性文件。

#用于建立到kafka集群的初始连接的主机/端口对列表。bootstrap.servers=localhost:29092#集群的唯一名称,用于形成连接集群组。请注意,这不能与消费者组IDsgroup.id=connect-cluster#冲突。转换器指定Kafka中的数据格式以及如何将其转换为Connect数据。用于存储偏移量的key.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverterkey.converter.schemas.enable=truevalue.converter.schemas.enable=true#主题。本主题应具有多个分区,并且应复制compacted.offset.storage.topic=connect-offsetsoffset.storage.replication.factor=1#主题以用于存储连接器和任务配置;请注意,此主题应是用于存储状态的单个分区、高度复制的config.storage.topic=connect-configsconfig.storage.replication.factor=1#主题。本主题可以有多个分区,并且复制和刷新compacted.status.storage.topic=connect-statusstatus.storage.replication.factor=1#的速度应该比正常快得多,这对testing/debuggingoffset.flush.interval.ms=10000plugin.path=/Users/cemalturkoglu/kafka/plugins很有用。

请注意,plugin.path是我们放置下载的库所需的路径。

在运行连接器之后,我们可以确认连接器的REST端点是可访问的,并且我们可以通过调用http://localhost:8083/connector-plugins来确认jdbc连接器在插件列表中。

当我们在分布式模式下操作时,我们通过使用配置JSON调用REST端点来运行连接器。我们可以从cURL命令的文件中指定配置有效负载。以下命令启动连接器。

{";名称";:";jdbc_source_connector_postgresql_01";,";配置";:{";connector.class";:";io.confluent.connect.jdbc.JdbcSourceConnector";,";connection.url";:";jdbc:postgresql://localhost:5432/demo";,";connection.user";:";Postgres";,";connection.password";:";root";,";topic.prefix";:";postgres-01-";,";poll.interval.ms";:3600000,";mode";:";Bulk";}}。

它将创建每个表的卡夫卡主题。主题使用topic.prefix+<;table_name>;命名。

模式配置用于指定将在下面讨论的工作模式。批量模式用于加载所有数据。

我们可以看到,我的DEMO数据库中有4个表被加载到了4个Kafka主题中:

{";schema";:{";type";:";struct";,";field";:[{";type";:";int64";,";可选";:false,";field";:";id";},{";type";:";String";,";可选";:true,";field";::";Flat";},{";type";:";string";,";:true,";field";:";postal_code";},{";type";:";string";,";可选";:true,";field";:";Street";},{";type";:";string";,";可选";:true,";field";:";title";},{";type";:";int64";,";可选";:true,";field";:";City_id";},{";type";:";int64";,";可选";:true,";field";:";user_id";}],";可选";:false,";name";:";地址";},";有效载荷";:{";id";:3,";平面";:";3/1B";,";邮政编码";:";ABX501";,";街道";:";街道2";,";标题";:";工作地址";,";City_id";:7,";user_id";:3}}。

请注意,它包含字段属性,该属性包含有关字段的信息和实际数据的有效负载。

默认情况下,查询所有要复制的表。但是,我们在按table.Whitelist和table.Blacklist配置复制时包括或排除表列表。我们可以同时使用黑名单或白名单。

除了上面演示中使用的批量模式之外,还有其他增量查询模式。只有在发生更改时,才能使用增量模式加载数据。某些列用于检测表或行中是否有更改。

批量:在此模式下,连接器将在每次迭代中加载所有选定的表。如果迭代间隔设置为某个较小的数字(默认值为5秒),则加载所有数据没有多大意义,因为会有重复的数据。如果定期备份或转储整个数据库,它可能会很有用。

递增:此模式使用对每行唯一的单个列,理想情况下自动递增主键来检测表中的更改。如果添加具有新ID的新行,它将被复制到Kafka。但是,此模式不会更改ID,因此缺乏捕获行上的更新操作的能力。incrementing.column n.name用于配置列名。

TIMESTAMP:使用显示上次修改时间戳的单个列,并且在每次迭代中只查询自该时间以来修改过的行。由于时间戳不是唯一字段,因此可能会遗漏一些具有相同时间戳的更新。TimeStamp p.column n.name用于配置列名。

TIMESTAMP+INCRENTING:使用唯一递增ID和TIMESTAMP的最健壮、最准确的模式。唯一的缺点是需要在旧表上添加修改时间戳列。

查询:连接器支持在每次迭代中使用自定义查询来获取数据。就增量更改而言,它不是很灵活。从非常宽的表中仅提取必要的列,或者获取包含多个连接表的视图,这可能很有用。如果查询变得复杂,则会增加对数据库的负载和性能影响。

如上所述,仅使用唯一ID或时间戳存在缺陷。将它们一起使用是更好的方法。以下配置显示了时间戳+递增模式的示例:

{";名称";:";jdbc_source_connector_postgresql_02";,";配置";:{";connector.class";:";io.confluent.connect.jdbc.JdbcSourceConnector";,";connection.url";:";jdbc:postgresql://localhost:5432/demo-db";,";connection.user";:";Postgres";,";connection.password";:";root";,";topic.prefix";:";postgres-02-";,";table.Whitelist";:";商店,标签,类别,地址,城市";,";mode";:";timestamp+incrementing";,";Timestp.Column.Name";:";LAST_MODIFIED_DATE";,";validate.non.null";:false,";db.timezone";:";欧洲/华沙";}}。

注使用validate.non.null是因为连接器要求时间戳列不为NULL,我们可以将这些列设置为NOT NULL,也可以通过设置validate.not.null false来禁用此验证。

而使用数据库系统的时间戳列时区很重要。由于时间不匹配,可能会有不同的行为,因此可以通过db.timezone进行配置。

白名单配置用于将表限制为给定列表。因此,这5个表被复制到卡夫卡主题中。

如上所述,使用不带时间戳的递增模式会导致不捕获表上的更新操作。使用时间戳+递增模式时,还会捕获更新操作。

JDBC连接器是将数据从关系数据库传送到Kafka的很好的开始方式。它很容易安装和使用,只需要配置几个属性就可以让您的数据流出。但是,JDBC连接器也有一些缺点。一些缺点可以列举如下:

它需要不断运行查询,因此会在物理数据库上生成一些负载。为了不造成性能影响,查询应该保持简单,并且不应该过度使用可伸缩性。

由于最需要增量时间戳,因此处理遗留数据存储将需要额外的工作来添加列。也可能存在无法更新架构的情况。

JDBC连接器无法获取删除操作,因为它使用SELECT查询来检索数据,并且没有复杂的机制来检测删除的行。您可以实施您的解决方案来克服此问题。