这里记录一下CDC和Kafka的协同工作过程。
- CDC(Change Data Capture:变更数据捕获)是数据库的一项功能,能够监控数据库表的变化。
- Kafka是一种分布式消息系统。
- 这里协同的目的是让CDC监控数据库表的更新,然后将更新发布到Kafka,最后让消费者响应这个更新。
- 另外还用到了一个CDC和Kafka之间的连接器,叫Debezium。
这里是针对SQL Server2019进行配置。
- 关于SQL Server的CDC官方文档可以看这个:https://docs.microsoft.com/zh-cn/sql/relational-databases/track-changes/about-change-data-capture-sql-server?view=sql-server-ver16。
- 配置过程主要参考了博文:CDC(Change Data Capture)功能使用及释疑 。
一些要点如下:
- 在SSMS的工具栏中点击新建查询,打开查询脚本;
- 执行下面语句,打开数据库的CDC功能(默认关闭);
exec sys.sp_cdc_enable_db
- 执行下面语句,在需要监控的数据库表上启用CDC功能;
- 各个选项的意义可以参考官方文档:https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sys-sp-cdc-enable-table-transact-sql?redirectedfrom=MSDN&view=sql-server-ver16。
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', -- source_schema @source_name = 'Test_Table', -- table_name @capture_instance = NULL, -- capture_instance @supports_net_changes = 1, -- supports_net_changes @role_name = NULL, -- role_name @index_name = NULL, -- index_name @captured_column_list = NULL -- captured_column_list
- 注意检查SQL Server代理是否开启,如果不开启是不能启用CDC的。
- 在SQL Server Configuration Manager->SQL Server服务中也可以开启SQL Server代理。
- 成功开启CDC后,在系统表里面会有6个以cdc开头的表;
- 执行下列语句应该有返回值。
# select * from cdc.dbo_xxx select * from cdc.dbo_Test_Table
- 此时对数据库表执行INSERT,UPDATe和DELETE都会触发CDC产生记录。
Kafka是一个高可用的消息系统,适用于数据批处理和流式数据。这里主要介绍如何在Windows上配置Kafka。
- Kafka的原理介绍可以参考博文:Kafka基本原理详解。
- 首先是安装Zookeeper,因为Kafka需要依赖于Zookeeper才能运行。
- 主要参考博文:Windows 安装 Zookeeper 详细步骤。
- 下载的官网地址:https://zookeeper.apache.org/index.html。
- 找最新的稳定版本,下载不带源码的版本。
- 用cmd或者powershell来解压:
tar -zxvf .apache-zookeeper-xxx.tar.gz
- 解压之后将解压的文件夹整个移动到常用的软件安装目录即可。
- 在conf文件夹中复制一份zoo.cfg配置文件,然后配置data文件夹和log文件夹的地址。
- 进入bin文件夹,启动Zookeeper
2.安装Kafka点击zkServer.cmd启动服务端
点击zkCli.cmd启动客户端
- 主要参考博文:windows下安装kafka教程。
- 官网下载地址:https://kafka.apache.org/downloads.html。
- 下载最新二进制版本(Binary downloads),官方推荐是下载Scala 2.13版本。
- 用cmd或者powershell来解压:
tar -zxvf .apache-zookeeper-xxx.taz
- 解压之后将解压的文件夹整个移动到常用的软件安装目录即可。
- 在config文件夹下配置server.properties文件。
- 注意,运行Kafka之前一定要先启动Zookeeper。
- 回到根目录,下面是一些常用的命令:
# 启动Kafka .binwindowskafka-server-start.bat .configserver.properties # 创建主题 .binwindowskafka-topics.bat --create --topic test --bootstrap-server localhost:9092 # 查看所有主题 .binwindowskafka-topics.bat --list --bootstrap-server localhost:9092 # 创建生产者进程 .binwindowskafka-console-producer.bat --broker-list localhost:9092 --topic test # 创建消费者进程 .binwindowskafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning三、配置Kafka Connector
Kafka Connect用来连接Kafka和别的系统,是一种通用的连接框架标准。
- 部署过程主要参考了博文:Kafka Connect的部署和使用详解1(安装配置、基本用法)。
Kafka Connector的部署有两种方式:分布式(distributed)和标准(standalone),这里主要是介绍使用standalone模式部署。
1.配置配置文件- 打开配置文件conf/connect-standalone.properties。
- 检查一下 bootstrap.servers是否为Kafka的地址和端口
- 重点是设置plugin.path,之后自行下载的connector都要放在该路径下才能生效。
- 检查offset.storage.file.filename文件是否存在,如果不存在要手动新建一个offsets文件,否则运行会出错。
-
Kafka自带了FileStreamSinkConnector和FileStreamSourceConnector这两个Connector,可以用于测试。
-
在conf/connect-file-source.properties中配置:
name=local-file-source connector.class=FileStreamSource tasks.max=1 file=./test.txt # 监控输入的文件路径 topic=connect-test # Kafka主题名称
- 在connect-file-sink.properties 中配置:
name=local-file-sink connector.class=FileStreamSink tasks.max=1 file=./test.sink.txt topics=connect-test
- 用命令行启动这两个connector
# 用standalone形式启动connector进程 .binwindowsconnect-standalone.bat config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
- 在Kafka根目录下新建两个文件test.sink.txt和test.txt。在test.txt输入内容,test.sink.txt会自动输出相同的内容。
- Debezium是专门用于数据库CDC的连接器,官网文档:https://debezium.io/documentation/reference/1.9/。
- 针对于SQL Server的官方文档:https://debezium.io/documentation/reference/1.9/connectors/sqlserver.html#sqlserver-example-configuration。
-
下载连接器存档debezium-connector-sqlserver-1.9.5.Final-plugin.tar.gz,解压后把文件夹放到上面配置的plugin.path路径下即可。
-
启动Debezium connector需要使用REST API格式。REST API是一种JSON格式,用于HTTP协议的数据传输,可以用Postman软件进行管理和配置。
-
Postman的一些使用教程:API测试之Postman使用完全指南(Postman教程,这篇文章就够了) 。
-
官方给的REST API配置样例如下:
{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.hostname": "192.168.99.100", "database.port": "1433", "database.user": "sa", "database.password": "Password!", "database.dbname": "testDB", "database.server.name": "fullfillment", "table.include.list": "dbo.customers", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.fullfillment" } }
- 属性的说明如下:
- 使用Postman发送配置信息即可启动Debezium Connector
- 常用的一些REST API接口:
GET /connectors:返回所有正在运行的 connector 名 POST /connectors:新建一个 connector;请求体必须是 json 格式并且需要包含 name 字段和 config 字段,name 是 connector 的名字,config 是 json 格式,必须包含你的 connector 的配置信息。 GET /connectors/{name}:获取指定 connetor 的信息 GET /connectors/{name}/config:获取指定 connector 的配置信息 PUT /connectors/{name}/config:更新指定 connector 的配置信息 GET /connectors/{name}/status:获取指定 connector 的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。 GET /connectors/{name}/tasks:获取指定 connector 正在运行的 task。 GET /connectors/{name}/tasks/{taskid}/status:获取指定 connector 的 task 的状态信息 PUT /connectors/{name}/pause:暂停 connector 和它的 task,停止数据处理知道它被恢复。 PUT /connectors/{name}/resume:恢复一个被暂停的 connector POST /connectors/{name}/restart:重启一个 connector,尤其是在一个 connector 运行失败的情况下比较常用 POST /connectors/{name}/tasks/{taskId}/restart:重启一个 task,一般是因为它运行失败才这样做。 DELETE /connectors/{name}:删除一个 connector,停止它的所有 task 并删除配置。
例如:
# 建立新的connector进程 POST http://192.168.1.133:8083/connectors # 查看所有的connector进程 GET http://192.168.1.133:8083/connectors # 查看已安装的connector插件 GET http://192.168.1.133:8083/connector-plugins # 查看connector进程状态 GET http://192.168.1.133:8083/connectors/test-sqlserver-connector/status # 删除某个connector进程 DELETE http://192.168.1.133:8083/connectors/test-sqlserver-connector4.一些注意
- 由于Debezium Connector不是由配置文件从控制台用.bat启动的,所以在使用REST API发送POST配置之前,一定要确保已经启动了Connector服务。
- 但由于Connector服务启动时一定要加上某个特定Connector的配置文件作为参数,所以这时候用默认自带的FileStreamSinkConnector和FileStreamSourceConnector作为参数就很合适,尽管这两个Connector我们实际中并不需要。
- 或许Debezium Connector也可以用配置文件+.bat在控制台启动,但暂时还不清楚如何配置,官方推荐也是用REST API启动的,而且强调了在启动前必须要先启动Connector服务。
因为生产者已经由上面的CDC充当,所以为了响应CDC的消息,这里用JAVA来实现生产者进程。
- 主要是参考了博文:三分钟教会你如何使用IDEA操作Kafka创建生产者消费者(有详细案列)。
2.配置fastjson库org.apache.kafka kafka-clients 3.1.1
- 由于Debezium Connector的CDC信息都是用JSON格式,所以还要下载一个JSON库,这里是用了fastjson。
- 可以参考博文:fastjson的基本使用方法和java 获取JSONObject中key对应的值。
- 使用poll可以从订阅的主题中获取消息,它的参数的含义是如果没有消息等待多久返回empty。
- 注意,poll不是每次只拉取一条消息的,而是可以一次性拉取很多条,默认最大是500。
- 关于poll的详细介绍可以参考博文:Apache Kafka(九)- Kafka Consumer 消费行为。
ConsumerRecordspoll = consumer.poll(Duration.ofMillis(1000));
- 如果不commit消息,那么Kafka会认为这条消息还没有被处理成功,下次重启消费者进程后仍然会poll到这些消息。
- commit消息的方式有两种:自动commit和手动commit,可以参考博文:Kafka消费消息自动提交与手动提交。。
- 手动commit设置如下:
// disable auto commit of offsets properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 一次性提交所有已经poll过的消息,即poll方法返回的最大偏移量 consumer.commitSync();
- 如果是要在处理完一次poll数据的过程中细粒度地提交,可以参考博客:Kafka系列教程08:Consumer提交消息偏移量。
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; public class Kafka_Test { public static void main(String[] args) { Properties prop=new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.133:9092"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000"); prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); // latest收最新的数据 none会报错 earliest最早的数据 prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); prop.put(ConsumerConfig.GROUP_ID_CONFIG,"G1"); // 每次poll只拉取2条消息 prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2); // 创建消费者 KafkaConsumerconsumer = new KafkaConsumer<>(prop); consumer.subscribe(Collections.singleton("fullfillment.dbo.Test_RDF_extractor"));//订阅 while (true){ ConsumerRecords poll = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : poll) { System.out.println(record.offset()+"t"+record.key()+"t"+record.value()); // 解析JSON值 JSONObject json_record_value = JSON.parseObject(record.value()); JSONObject json_record_value_after = json_record_value.getJSONObject("after"); String ext_code = json_record_value_after.getString("ext_code"); System.out.println(ext_code); // 细粒度提交消息Offset // 构建提交参数,包括partition和offset的信息 Map offsetMap = new HashMap<>(); // 使用下一个偏移量作为提交的值,下一次就从这里开始拉取消息 offsetMap.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata")); // 提交指定的offset consumer.commitSync(offsetMap); } } } }