历史数据也能关联上,进行关联的两个表长久保存在状态里,关联结果也是更新的
优点:可以保证两个表的数据一直可以关联上,数据不是同时到达的也可以关联上
缺点:两个表都缓存在状态中,会越来越大,每次进行checkpoint所需的时间就越长,最后导致flink反压,如果chenkpoint多次超时任务就会失败
-- 创建学生表流表,数据再kafka中 CREATE TABLE student_join ( id String, name String, age int, gender STRING, clazz STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'student_join', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'asdasdasd', 'format' = 'csv', 'scan.startup.mode' = 'latest-offset' ); -- 分数表 CREATE TABLE score_join ( s_id String, c_id String, sco int ) WITH ( 'connector' = 'kafka', 'topic' = 'score_join', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'asdasdasd', 'format' = 'csv', 'scan.startup.mode' = 'latest-offset' ); --- inner join 两边都有关联字段才行 select a.id,a.name,b.sco from student_join as a inner join score_join as b on a.id=b.s_id -- left outer join select a.id,a.name,b.sco from student_join as a left join score_join as b on a.id=b.s_id -- full outer join select a.id,a.name,b.sco from student_join as a full join score_join as b on a.id=b.s_id -- 创建生产者向两个topic中生产数据 kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic score_join 1500100001,1000001,98 1500100001,1000002,5 1500100001,1000003,0 kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic student_join 1500100001,施笑槐,22,女,文科六班 1500100002,吕金鹏,24,男,文科七班2. Interval Joins(表要有时间字段)
关联一定时间内的数据,只需要在状态中保存一定时间内的数据,时间段外的数据不需要保存
优点:状态不会太大
缺点:时间设置不合理的话会导致数据关联不上
处理时间:数据进来的时间 -- 创建学生表流表,数据再kafka中 CREATE TABLE student_join_proc ( id String, name String, age int, gender STRING, clazz STRING, stu_time as PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'student_join', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'asdasdasd', 'format' = 'csv', 'scan.startup.mode' = 'latest-offset' ); -- 分数表 CREATE TABLE score_join_proc ( s_id String, c_id String, sco int, sco_time as PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'score_join', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'asdasdasd', 'format' = 'csv', 'scan.startup.mode' = 'latest-offset' ); -- Interval Joins select a.id,a.name,b.sco from student_join_proc as a, score_join_proc as b where a.id=b.s_id and a.stu_time BETWEEN b.sco_time - INTERVAL '10' SECOND AND b.sco_time -- 创建生产者向两个topic中生产数据(表设置的读取的时最新数据) kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic score_join 1500100001,1000001,98 1500100001,1000002,5 1500100002,1000003,0 kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic student_join 1500100001,施笑槐,22,女,文科六班 1500100002,吕金鹏,24,男,文科七班3. Temporal Joins(事实表关联版本的表)
版本的表:表里有时间字段同时数据随着时间而发生变化
不需要将两个表的数据一直保存在状态中,远古版本会自动删除
-- 订单表,向订单表中生产数据,两表根据主键进行关联 CREATE TABLE orders ( order_id STRING, -- 订单编号 price DECIMAL(32,2), --订单的价格 currency STRING, -- 汇率表主键 order_time TIMESTAMP(3), -- 订单发生的时间 WATERMARK FOR order_time AS order_time -- 设置事件时间和水位线 ) WITH ( 'connector' = 'kafka', 'topic' = 'orders', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'asdasdasd', 'format' = 'csv', 'scan.startup.mode' = 'latest-offset' ); ----------------------------------------------------------------------------------- --汇率表 注释:canal监控MySQL中的汇款的更新,历史的每个版本都会记录为canal-json格式的数据向kanal中生产数据 CREATE TABLE currency_rates ( currency STRING, -- 汇率表主键 conversion_rate DECIMAL(32, 2), -- 汇率 update_time TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL, --汇率更新时间 WATERMARK FOR update_time AS update_time,--时间字段和水位线 PRIMARY KEY(currency) NOT ENFORCED--设置主键 ) WITH ( 'connector' = 'kafka', 'topic' = 'bigdata.currency_rates', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'asdasdasd', 'format' = 'canal-json', 'scan.startup.mode' = 'earliest-offset', 'canal-json.ignore-parse-errors' = 'true' ); ----------------------------------------------------------------------------------- -- Temporal Joins SELECT order_id, price, orders.currency, conversion_rate, order_time FROM orders LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time ON orders.currency = currency_rates.currency; ----------------------------------------------------------------------------------- 解释: 1. 拉链表用于保存过去对应时间的版本,关联时取到对应时间的版本号 2. 用订单表的时间到汇率表中查数据,注意水位线对齐,到达水位线才能触发关联,汇款表更新了新的数据才能表明前一时间端的数据是没有更新的,才能进行关联 3. 11:20进行汇率表更新表明11:20之前的汇率都可以进行关联 4.同款的应用场景有订单状态表,下单付款收获退货退款,等状态的变化 -- 订单表数据 kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic orders 001,1000.0,1001,2022-08-02 11:08:20 001,1000.0,1001,2022-08-02 11:15:55 001,1000.0,1001,2022-08-02 11:20:554. 流表(无界-kafka)关联维表(有界-hbase,mysql) 4.1 常规的join(需要将两个表的)
问题:会出现数据库中维表更新了,但是flink中无法捕获更新,只能关联到任务刚启动时读取的数据
-- 创建一个jdbc维表 -- 有界流,读取一次任务就关闭了,数据库更新后flink是不知道的 CREATE TABLE student_mysql ( id BIGINT, name STRING, age BIGINT, gender STRING, clazz STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/bigdata', 'table-name' = 'students', 'username' = 'root', 'password' = '123456' ); ----------------------------------------------------------------------------------- -- 分数表 -- 无界流 CREATE TABLE score_join ( s_id String, c_id String, sco int ) WITH ( 'connector' = 'kafka', 'topic' = 'score_join', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'asdasdasd', 'format' = 'csv', 'scan.startup.mode' = 'latest-offset' ); ----------------------------------------------------------------------------------- -- 无界流关联有界流 select b.id,b.name,a.sco from score_join as a join student_mysql as b on a.s_id=cast(b.id as STRING) -- 创建生产者向两个topic中生产数据 kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic score_join 1500100003,1000001,98 1500100004,1000002,5 1500100001,1000003,04.2 Lookup Join (查询join:流表关联维表)
**1. 流表每来一条数据时,都会通过关联字段到维表底层数据库中查询数据
2. 问题 :并不是将两个表直接进行关联,但是如果每一条数据都要经过数据库这样吞吐量比较低,如何解决?
3. 开启缓存,指定缓存量和缓存过期时间:将维表更新后的数据一定量或者一定时间内保存在缓存中,更改后立即进行查询的话关联的是历史版本数据
**
流表在左边,维表在右边 -- 创建一个jdbc维表 -- 有界流 CREATE TABLE student_mysql ( id BIGINT, name STRING, age BIGINT, gender STRING, clazz STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/bigdata', 'table-name' = 'students', 'username' = 'root', 'password' = '123456', 'lookup.cache.max-rows' = '100' ,-- 开启缓存,指定缓存数据量,可以提高关联性能 'lookup.cache.ttl' = '30s' -- 缓存过期时间,一般会按照维表更新频率设置 ); ----------------------------------------------------------------------------------- -- 分数表 -- 无界流 CREATE TABLE score_join ( s_id String, c_id String, sco int, pro_time as PROCTIME() -- Lookup Join关联方式,流表需要有一个时间字段 ) WITH ( 'connector' = 'kafka', 'topic' = 'score_join', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'asdasdasd', 'format' = 'csv', 'scan.startup.mode' = 'latest-offset' ); ----------------------------------------------------------------------------------- SELECt b.id,b.name,b.age,a.sco FROM score_join as a LEFT JOIN student_mysql FOR SYSTEM_TIME AS OF a.pro_time as b ON cast(a.s_id as BIGINT)= b.id; -- 创建生产者向两个topic中生产数据 kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic score_join 1500100003,1000001,98 1500100004,1000002,5 1500100001,1000003,05. 自定义函数
class写类,里面方法的运行需要创建对象
object用来写main函数,代码可以运行,方法可以直接用方法名调用
import org.apache.flink.table.functions.ScalarFunction class SubstringFunction extends ScalarFunction{ def eval(s: String, begin: Integer, end: Integer): String = { s.substring(begin, end) } }5.2 启动sql-client,指定jar包
将项目打包上传到集群: sql-client.sh -j flink-1.0.jar 启动flink让其找到包,或者将jar包放在flink的lib目录下5.3 创建函数
注册一个函数: CREATE TEMPORARY SYSTEM FUNCTION substringFunction AS 'com.shujia.flink.sql.SubstringFunction' LANGUAGE SCALA;6. Flink-SQL中保证唯一一次
问题:当SQL执行失败时重启SQL会出现发反压,因为会进行重新计算,如何解决?
- 开启checkpoint - 可以在flink配置文件中统一开启 - 创建一个sql文件,把所有的sql放在sql文件 ----------source表-------------- CREATE TABLE words ( `word` STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'words_exactly_once', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'asdasd', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); -------在mysql中创建表----------- CREATE TABLE `word_count` ( `word` varchar(255) NOT NULL, `c` bigint(20) DEFAULT NULL, PRIMARY KEY (`word`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -------------jdbc sink表--------- CREATE TABLE word_count ( word STRING, c BIGINT, PRIMARY KEY (word) NOT ENFORCED -- 按照主键更新数据 ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8', 'table-name' = 'word_count', -- 需要手动到数据库中创建表 'username' = 'root', 'password' = '123456' ); -----------统计单词数量,将结果保存到数据库中------------ insert into word_count select word,count(1) as c from words group by word --------------生产数据---------------- kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic words_exactly_once --------------重启SQL---------------- sql-client.sql -f word_count.sql7. 执行一组SQL
---------------source表---------------- CREATE TABLE words ( `word` STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'words_exactly_once', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'asdasd', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); ---------------sink表------------------ CREATE TABLE print_table ( word STRING, c BIGINT ) WITH ('connector' = 'print'); -- 执行多个inert into 语句 -- 原表只需要读取一次就可以了,一个输入多个输出 EXECUTE STATEMENT SET BEGIN insert into print_table select word,count(1) as c from words group by word; insert into word_count select word,count(1) as c from words group by word; END; ----------------------------------------------------------- EXECUTE STATEMENT SET BEGIN ........(中间写sql代码) END;