栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 软件开发 > 后端开发 > Java

Flink快速应用(批、流一体)简单实现

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink快速应用(批、流一体)简单实现

Flink快速应用

Flink批处理:统计一个文件中各个单词出现的次数,把统计结果输出到文件。

新建maven项目

  • 引入依赖
        
            org.apache.flink
            flink-clients
            1.15.1
        

        
        
            org.apache.flink
            flink-java
            1.15.1
        

        
        
            org.apache.flink
            flink-streaming-java
            1.15.1
            compile
        
批处理快速应用
package com.tong;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


public class WordCountBatch {

    public static void main(String[] args) throws Exception {

        //定义输入输出路径
        String input = "D:\in\123.txt";
        String output = "D:\out\123.csv";

        //1.获取flink的运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //2.用flink的运行环境,去获取待分析数据
        DataSource textValue = env.readTextFile(input);
        //3.处理数据,使用flatMap方法进行聚合处理
        DataSet> dataSet = textValue.flatMap(new SplitValue())
                //b.相同单词聚合到一起,0指的是Tuple2的String位置,如(hello,1)中的hello的位置0
                .groupBy(0)
                //c.聚合到的数据累加处理,累加处理的的数据,即就是(hello,1)中的1的位置
                .sum(1);
        //4.保存处理结果,写入到csv中
        dataSet.writeAsCsv(output,"n"," ").setParallelism(1);
        //5.触发执行程序
        env.execute("wordCount batch process");

    }

    
    static class SplitValue implements FlatMapFunction> {
        //a.将文本内容打散成一个一个单词
        @Override
        public void flatMap(String line, Collector> collector) throws Exception {
            //数据按照空格进行切分
            String[] words = line.split(" ");

            for (String word : words) {
                //将单词拼接成(hello,1)这种样式,方便之后就行聚合计算
                collector.collect(new Tuple2<>(word, 1));
            }
        }
    }
}

执行完后的结果如下:

流处理快速应用
package com.tong;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class WordCountStream {


    public static void main(String[] args) throws Exception {

        String ip="127.0.0.1";
        int port = 7777;
        //获取流式数据的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource textStream = env.socketTextStream(ip, port);

        SingleOutputStreamOperator> dataStream = textStream.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String line, Collector> collector) throws Exception {
                //数据按照空格进行切分
                String[] words = line.split(" ");

                for (String word : words) {
                    //将单词拼接成(hello,1)这种样式,方便之后就行聚合计算
                    collector.collect(new Tuple2<>(word, 1));
                }
            }
        });

        SingleOutputStreamOperator> word = dataStream.keyBy(0).sum(1);

        word.print();
        env.execute("wordCount stream process");
    }
}

流处理模仿小工具
netcat(nc)下载地址:https://eternallybored.org/misc/netcat/
启动使用:https://blog.csdn.net/q_a_z_w_s_x___/article/details/115327163

运行结果如下

转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/1036421.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号