kafka的安装与配置请参考:https://blog.csdn.net/weixin_35757704/article/details/120488287
- 首先在kafka中创建一个topic,名称叫mytesttopic,进入到kafka的目录下,运行:
./bin/kafka-topics.sh --create --topic mytesttopic --replication-factor 1 --partitions 1 --bootstrap-server localhost:9092
然后启动生产者:
./bin/kafka-console-producer.sh --topic mytesttopic --broker-list localhost:9092
- 首先配置pom.xml:
org.apache.flink flink-connector-kafka_2.12 1.13.2
- java代码如下:
import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class KafkaStreamWordCount { public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.99.5:9092");// 这里是kafka的主机地址,可以是 域名:端口,也可是 ip:端口 properties.setProperty("group.id", "test");//第1个参数是固定值 group.id,第2个参数是自定义的组ID,这个可以自己指定 DeserializationSchemadeserializationSchema = new SimpleStringSchema(); String topic = "mytesttopic";// 哇!这里不要写错啊,这个是作为消费者接收的kafka对应的topic名称 DataStream text = env.addSource(new FlinkKafkaConsumer (topic, deserializationSchema, properties)); text.print(); env.execute("Flink-Kafka demo"); } }
- 修改host文件(如果是本机的flink与kafka是不需要配置的)
比如我的虚拟机主机名是:ubuntu,ip是:192.168.99.5,就在host里添加:
192.168.99.5 ubuntu
注意即便properties.setProperty("bootstrap.servers", "192.168.99.5:9092");这样使用ip:端口配置也需要添加host!
- 运行java程序,然后在kafka的生产者中输入任何想要输入的内容,就可以在flink里查看了
xq@ubuntu:~/Desktop/software/kafka_2.12-2.7.1$ ./bin/kafka-console-producer.sh --topic myte--broker-list localhost:9092 >hello kafka >hello flink >
在flink中显示:
......... 16:52:44,055 INFO org.apache.kafka.clients.metadata [] - [Consumer clientId=consumer-test-26, groupId=test] Cluster ID: HDij23gxR_edwXhIDqE9ng 16:52:44,056 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-test-26, groupId=test] Discovered group coordinator ubuntu:9092 (id: 2147483647 rack: null) 16:52:44,075 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-test-26, groupId=test] Setting offset for partition mytesttopic-0 to the committed offset FetchPosition{offset=7, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=ubuntu:9092 (id: 0 rack: null), epoch=0}} 16> hello kafka 16> hello flink