Flink批处理:统计一个文件中各个单词出现的次数,把统计结果输出到文件。
新建maven项目
- 引入依赖
批处理快速应用org.apache.flink flink-clients 1.15.1 org.apache.flink flink-java 1.15.1 org.apache.flink flink-streaming-java 1.15.1 compile
package com.tong; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class WordCountBatch { public static void main(String[] args) throws Exception { //定义输入输出路径 String input = "D:\in\123.txt"; String output = "D:\out\123.csv"; //1.获取flink的运行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //2.用flink的运行环境,去获取待分析数据 DataSourcetextValue = env.readTextFile(input); //3.处理数据,使用flatMap方法进行聚合处理 DataSet > dataSet = textValue.flatMap(new SplitValue()) //b.相同单词聚合到一起,0指的是Tuple2 的String位置,如(hello,1)中的hello的位置0 .groupBy(0) //c.聚合到的数据累加处理,累加处理的的数据,即就是(hello,1)中的1的位置 .sum(1); //4.保存处理结果,写入到csv中 dataSet.writeAsCsv(output,"n"," ").setParallelism(1); //5.触发执行程序 env.execute("wordCount batch process"); } static class SplitValue implements FlatMapFunction > { //a.将文本内容打散成一个一个单词 @Override public void flatMap(String line, Collector > collector) throws Exception { //数据按照空格进行切分 String[] words = line.split(" "); for (String word : words) { //将单词拼接成(hello,1)这种样式,方便之后就行聚合计算 collector.collect(new Tuple2<>(word, 1)); } } } }
执行完后的结果如下:
package com.tong; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class WordCountStream { public static void main(String[] args) throws Exception { String ip="127.0.0.1"; int port = 7777; //获取流式数据的执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSourcetextStream = env.socketTextStream(ip, port); SingleOutputStreamOperator > dataStream = textStream.flatMap(new FlatMapFunction >() { @Override public void flatMap(String line, Collector > collector) throws Exception { //数据按照空格进行切分 String[] words = line.split(" "); for (String word : words) { //将单词拼接成(hello,1)这种样式,方便之后就行聚合计算 collector.collect(new Tuple2<>(word, 1)); } } }); SingleOutputStreamOperator > word = dataStream.keyBy(0).sum(1); word.print(); env.execute("wordCount stream process"); } }
流处理模仿小工具
netcat(nc)下载地址:https://eternallybored.org/misc/netcat/
启动使用:https://blog.csdn.net/q_a_z_w_s_x___/article/details/115327163
运行结果如下