栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 软件开发 > 后端开发 > Java

IDEA03:数据库CDC、Kafka和连接器Debezium配置

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

IDEA03:数据库CDC、Kafka和连接器Debezium配置

写在前面

这里记录一下CDC和Kafka的协同工作过程。

  • CDC(Change Data Capture:变更数据捕获)是数据库的一项功能,能够监控数据库表的变化。
  • Kafka是一种分布式消息系统。
  • 这里协同的目的是让CDC监控数据库表的更新,然后将更新发布到Kafka,最后让消费者响应这个更新。
  • 另外还用到了一个CDC和Kafka之间的连接器,叫Debezium。
一、配置数据库CDC

这里是针对SQL Server2019进行配置。

  • 关于SQL Server的CDC官方文档可以看这个:https://docs.microsoft.com/zh-cn/sql/relational-databases/track-changes/about-change-data-capture-sql-server?view=sql-server-ver16。
  • 配置过程主要参考了博文:CDC(Change Data Capture)功能使用及释疑 。

一些要点如下:

  • 在SSMS的工具栏中点击新建查询,打开查询脚本;
  • 执行下面语句,打开数据库的CDC功能(默认关闭);
exec sys.sp_cdc_enable_db 
  • 执行下面语句,在需要监控的数据库表上启用CDC功能;
  • 各个选项的意义可以参考官方文档:https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sys-sp-cdc-enable-table-transact-sql?redirectedfrom=MSDN&view=sql-server-ver16。
EXEC sys.sp_cdc_enable_table
        @source_schema = 'dbo', -- source_schema
        @source_name = 'Test_Table', -- table_name
        @capture_instance = NULL, -- capture_instance
        @supports_net_changes = 1, -- supports_net_changes
        @role_name = NULL, -- role_name
        @index_name = NULL, -- index_name
        @captured_column_list = NULL -- captured_column_list
  • 注意检查SQL Server代理是否开启,如果不开启是不能启用CDC的。
  • 在SQL Server Configuration Manager->SQL Server服务中也可以开启SQL Server代理。

  • 成功开启CDC后,在系统表里面会有6个以cdc开头的表;
  • 执行下列语句应该有返回值。
# select * from cdc.dbo_xxx
select * from cdc.dbo_Test_Table
  • 此时对数据库表执行INSERT,UPDATe和DELETE都会触发CDC产生记录。
二、配置Kafka

Kafka是一个高可用的消息系统,适用于数据批处理和流式数据。这里主要介绍如何在Windows上配置Kafka。

  • Kafka的原理介绍可以参考博文:Kafka基本原理详解。
1.安装Zookeeper
  • 首先是安装Zookeeper,因为Kafka需要依赖于Zookeeper才能运行。
  • 主要参考博文:Windows 安装 Zookeeper 详细步骤。
  • 下载的官网地址:https://zookeeper.apache.org/index.html。
  • 找最新的稳定版本,下载不带源码的版本。

  • 用cmd或者powershell来解压:
tar -zxvf .apache-zookeeper-xxx.tar.gz
  • 解压之后将解压的文件夹整个移动到常用的软件安装目录即可。
  • 在conf文件夹中复制一份zoo.cfg配置文件,然后配置data文件夹和log文件夹的地址。

  • 进入bin文件夹,启动Zookeeper

点击zkServer.cmd启动服务端
点击zkCli.cmd启动客户端

2.安装Kafka
  • 主要参考博文:windows下安装kafka教程。
  • 官网下载地址:https://kafka.apache.org/downloads.html。
  • 下载最新二进制版本(Binary downloads),官方推荐是下载Scala 2.13版本。

  • 用cmd或者powershell来解压:
tar -zxvf .apache-zookeeper-xxx.taz
  • 解压之后将解压的文件夹整个移动到常用的软件安装目录即可。
  • 在config文件夹下配置server.properties文件。
  • 注意,运行Kafka之前一定要先启动Zookeeper。
  • 回到根目录,下面是一些常用的命令:
# 启动Kafka
.binwindowskafka-server-start.bat .configserver.properties
# 创建主题
.binwindowskafka-topics.bat --create --topic test --bootstrap-server localhost:9092
# 查看所有主题
.binwindowskafka-topics.bat --list --bootstrap-server localhost:9092
# 创建生产者进程
.binwindowskafka-console-producer.bat --broker-list localhost:9092 --topic test
# 创建消费者进程
.binwindowskafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
三、配置Kafka Connector

Kafka Connect用来连接Kafka和别的系统,是一种通用的连接框架标准。

  • 部署过程主要参考了博文:Kafka Connect的部署和使用详解1(安装配置、基本用法)。

Kafka Connector的部署有两种方式:分布式(distributed)和标准(standalone),这里主要是介绍使用standalone模式部署。

1.配置配置文件
  • 打开配置文件conf/connect-standalone.properties。
  • 检查一下 bootstrap.servers是否为Kafka的地址和端口
  • 重点是设置plugin.path,之后自行下载的connector都要放在该路径下才能生效。
  • 检查offset.storage.file.filename文件是否存在,如果不存在要手动新建一个offsets文件,否则运行会出错。

2.配置Connector测试
  • Kafka自带了FileStreamSinkConnector和FileStreamSourceConnector这两个Connector,可以用于测试。

  • 在conf/connect-file-source.properties中配置:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=./test.txt  # 监控输入的文件路径
topic=connect-test  # Kafka主题名称
  • 在connect-file-sink.properties 中配置:
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=./test.sink.txt
topics=connect-test
  • 用命令行启动这两个connector
# 用standalone形式启动connector进程
.binwindowsconnect-standalone.bat config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
  • 在Kafka根目录下新建两个文件test.sink.txt和test.txt。在test.txt输入内容,test.sink.txt会自动输出相同的内容。
3.配置Debezium Connector
  • Debezium是专门用于数据库CDC的连接器,官网文档:https://debezium.io/documentation/reference/1.9/。
  • 针对于SQL Server的官方文档:https://debezium.io/documentation/reference/1.9/connectors/sqlserver.html#sqlserver-example-configuration。

  • 下载连接器存档debezium-connector-sqlserver-1.9.5.Final-plugin.tar.gz,解压后把文件夹放到上面配置的plugin.path路径下即可。

  • 启动Debezium connector需要使用REST API格式。REST API是一种JSON格式,用于HTTP协议的数据传输,可以用Postman软件进行管理和配置。

  • Postman的一些使用教程:API测试之Postman使用完全指南(Postman教程,这篇文章就够了) 。

  • 官方给的REST API配置样例如下:

{
    "name": "inventory-connector", 
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", 
        "database.hostname": "192.168.99.100", 
        "database.port": "1433", 
        "database.user": "sa", 
        "database.password": "Password!", 
        "database.dbname": "testDB", 
        "database.server.name": "fullfillment", 
        "table.include.list": "dbo.customers", 
        "database.history.kafka.bootstrap.servers": "kafka:9092", 
        "database.history.kafka.topic": "dbhistory.fullfillment" 
    }
}
  • 属性的说明如下:

  • 使用Postman发送配置信息即可启动Debezium Connector
  • 常用的一些REST API接口:
GET /connectors:返回所有正在运行的 connector 名
POST /connectors:新建一个 connector;请求体必须是 json 格式并且需要包含 name 字段和 config 字段,name 是 connector 的名字,config 是 json 格式,必须包含你的 connector 的配置信息。
GET /connectors/{name}:获取指定 connetor 的信息
GET /connectors/{name}/config:获取指定 connector 的配置信息
PUT /connectors/{name}/config:更新指定 connector 的配置信息
GET /connectors/{name}/status:获取指定 connector 的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks:获取指定 connector 正在运行的 task。
GET /connectors/{name}/tasks/{taskid}/status:获取指定 connector 的 task 的状态信息
PUT /connectors/{name}/pause:暂停 connector 和它的 task,停止数据处理知道它被恢复。
PUT /connectors/{name}/resume:恢复一个被暂停的 connector
POST /connectors/{name}/restart:重启一个 connector,尤其是在一个 connector 运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart:重启一个 task,一般是因为它运行失败才这样做。
DELETE /connectors/{name}:删除一个 connector,停止它的所有 task 并删除配置。

例如:

# 建立新的connector进程
POST http://192.168.1.133:8083/connectors
# 查看所有的connector进程
GET http://192.168.1.133:8083/connectors
# 查看已安装的connector插件
GET http://192.168.1.133:8083/connector-plugins
# 查看connector进程状态
GET http://192.168.1.133:8083/connectors/test-sqlserver-connector/status
# 删除某个connector进程
DELETE http://192.168.1.133:8083/connectors/test-sqlserver-connector
4.一些注意
  • 由于Debezium Connector不是由配置文件从控制台用.bat启动的,所以在使用REST API发送POST配置之前,一定要确保已经启动了Connector服务。
  • 但由于Connector服务启动时一定要加上某个特定Connector的配置文件作为参数,所以这时候用默认自带的FileStreamSinkConnector和FileStreamSourceConnector作为参数就很合适,尽管这两个Connector我们实际中并不需要。
  • 或许Debezium Connector也可以用配置文件+.bat在控制台启动,但暂时还不清楚如何配置,官方推荐也是用REST API启动的,而且强调了在启动前必须要先启动Connector服务。
四、创建JAVA的消费者

因为生产者已经由上面的CDC充当,所以为了响应CDC的消息,这里用JAVA来实现生产者进程。

  • 主要是参考了博文:三分钟教会你如何使用IDEA操作Kafka创建生产者消费者(有详细案列)。
1.首先是导入Maven依赖包:


       org.apache.kafka
       kafka-clients
       3.1.1

2.配置fastjson库
  • 由于Debezium Connector的CDC信息都是用JSON格式,所以还要下载一个JSON库,这里是用了fastjson。
  • 可以参考博文:fastjson的基本使用方法和java 获取JSONObject中key对应的值。
3.Debezium Connector的CDC消息
  • 使用poll可以从订阅的主题中获取消息,它的参数的含义是如果没有消息等待多久返回empty。
  • 注意,poll不是每次只拉取一条消息的,而是可以一次性拉取很多条,默认最大是500。
  • 关于poll的详细介绍可以参考博文:Apache Kafka(九)- Kafka Consumer 消费行为。
ConsumerRecords poll = consumer.poll(Duration.ofMillis(1000));
  • 如果不commit消息,那么Kafka会认为这条消息还没有被处理成功,下次重启消费者进程后仍然会poll到这些消息。
  • commit消息的方式有两种:自动commit和手动commit,可以参考博文:Kafka消费消息自动提交与手动提交。。
  • 手动commit设置如下:
// disable auto commit of offsets
 properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 一次性提交所有已经poll过的消息,即poll方法返回的最大偏移量
consumer.commitSync();
  • 如果是要在处理完一次poll数据的过程中细粒度地提交,可以参考博客:Kafka系列教程08:Consumer提交消息偏移量。
4.一个消费者进程demo
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class Kafka_Test {
    public static void main(String[] args) {
        Properties prop=new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.133:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        // latest收最新的数据 none会报错 earliest最早的数据
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"G1");
        // 每次poll只拉取2条消息
        prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
        // 创建消费者
        KafkaConsumer consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Collections.singleton("fullfillment.dbo.Test_RDF_extractor"));//订阅

        while (true){
            ConsumerRecords poll = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord record : poll) {
                System.out.println(record.offset()+"t"+record.key()+"t"+record.value());

                // 解析JSON值
                JSONObject json_record_value = JSON.parseObject(record.value());
                JSONObject json_record_value_after = json_record_value.getJSONObject("after");
                String ext_code = json_record_value_after.getString("ext_code");
                System.out.println(ext_code);

                // 细粒度提交消息Offset
                // 构建提交参数,包括partition和offset的信息
                Map offsetMap = new HashMap<>();
                // 使用下一个偏移量作为提交的值,下一次就从这里开始拉取消息
                offsetMap.put(new TopicPartition(record.topic(), record.partition()),
                        new OffsetAndMetadata(record.offset() + 1, "no metadata"));
                // 提交指定的offset
                consumer.commitSync(offsetMap);
            }
        }
    }
}
转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/1036380.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号