栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 软件开发 > 后端开发 > Java

Flink学习13:Flink外接kafka数据源

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink学习13:Flink外接kafka数据源

1.flink外部数据源(kafka)简介
 kafka的broker


borker即是kafka集群的每台机器Topic是一类数据的集合
 

Partition


是Topic数据的物理分区

Producer


负责生成数据到kafka的broker中

consumer


 

consumer Group


为consumer指定对应的consumer Group

2.kafka的安装
下载kafka


kafka_2.12-3.2.0kafkakakakakakakakakakakakaka-Java文档类资源-CSDN下载

下载完成后直接解压

tar -zxvf kafka_2.12-3.2.0.tgz

修改kafka配置文件


vi /opt/kafka/config/server.properties

增加3个配置

listeners=PLAINTEXT://10.31.126.100:9092

advertised.listeners=PLAINTEXT://10.31.126.100:9092

zookeeper.connect=10.31.126.100:2181

启动kafka
1.先启动zookeeper服务

cd /opt/kafka

./bin/zookeeper-server-start.sh ./config/zookeeper.properties
 

开启以后,不要关闭当前窗口,不然的话zookeeper服务会中断

ps: 如果报错:/opt/kafka/bin/kafka-run-class.sh: line 342: /opt/kafka/echo/bin/java: No such file or directory

可以看下 echo $JAVA_HOME 是不是路径打印不出。

解决方案: 执行    source /etc/profile ,再打印下  echo $JAVA_HOME,看下是否正常。

2.开启kafka服务

cd /opt/kafka

./bin/kafka-server-start.sh ./config/server.properties

开启以后,不要关闭当前窗口,不然的话kafka服务会中断

3.测试kafka

bin/kafka-topics.sh --create --bootstrap-server 10.31.126.10:9092 --replication-factor 1 --partitions 1 --topic wordTest

ps:在较新版本(2.2 及更高版本)的 Kafka 不再需要 ZooKeeper 连接字符串,即- -zookeeper localhost:2181。使用 Kafka Broker的 --bootstrap-server localhost:9092来替代- -zookeeper localhost:2181。
 

2.2一下版本:./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication 1 --partitions 1 --topic wordsendertest

来启动
 

查看topic列表


bin/kafka-topics.sh --bootstrap-server 10.31.126.10:9092 --list

查看对应Topic描述: 


bin/kafka-topics.sh --describe --bootstrap-server 10.31.126.10:9092 --topic wordTest

删除topic


bin/kafka-topics.sh -delete --bootstrap-server 10.31.126.10:9092 --topic wordTest

可以开启多个生产者客户端发送消息,开启多个消费者客户端接收消息,以观察这种订阅-发布模式实现的消息队列。

生产者客户端:

bin/kafka-console-producer.sh --bootstrap-server 10.31.126.10:9092 --topic wordTest

打开消费者客户端:

从当前时间点,开始取数据

bin/kafka-console-consumer.sh --bootstrap-server 10.31.126.10:9092 --topic wordTest

从最开始的时间点,开始取数据,(取该topic所有的数据)

bin/kafka-console-consumer.sh --bootstrap-server 10.31.126.10:9092 --topic wordTest --from-beginning

关闭kafka

bin/kafka-server-stop.sh

开启生产者和消费者 

3.kafka数据源,flink程序

编写flink程序,消费kafka数据

核心的生成kafka数据源方法介绍:

 其中SimpleStringSchema,是因为kafka是纯字节存储,所以需要在代码中进行反序列化成对象,让Scala可以解析。

设置offset的几种方式

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase}

import java.util.Properties


object kafkaTest {
  def main(args: Array[String]): Unit = {

    //create the env
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //for kafka connection
    val kafkaProps = new Properties()

    //kafka's attribute
    kafkaProps.setProperty("bootstrap.servers","10.31.126.10:9092")

    //set the consumer's group
    //kafkaProps.setProperty("group.id","group1")

    //create the consumer
    val kafkaSource = new FlinkKafkaConsumer[String]("wordTest", new SimpleStringSchema, kafkaProps)

    //set offset
    kafkaSource.setStartFromEarliest()

    //auto commit offset
    kafkaSource.setCommitOffsetsOnCheckpoints(true)

    //band data source
    val stream = env.addSource(kafkaSource)

    stream.print()

    //execute
    env.execute()

  }
}

输出效果:

 

转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/1039376.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号