栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 软件开发 > 后端开发 > Java

Flume组件基本使用

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flume组件基本使用

Flume

海量日志收集系统,基本进程单元Agent,每台电脑只有一个Agent

核心组件

数据采集因为事务的原因,采集信息会存在指针标志当前采集的索引位置,程序关闭重启后会根据上次索引位置继续读取

Client
  • 生产数据,运行在独立的线程
Agent

包含SourceChannelSink,。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks,

  • 一个独立的flume进程
Source

组件可以收集通过文件/路径/命令获取到的数据,也可以收集到flume传过来的数据。并将收集后的数据整理成event发送给Channel

  • 数据收集组件,数据封装为event传输
Channel

可以接收到Source或者经过拦截器处理后的event数据,类似消息中间件,消息可以临时存储在内存/数据库/文件中

平衡数据传入和传出之间的速率问题

  • event数据缓冲区,消息中间件,基于内存存储掉电易失,基于文件存储不易丢失
Sink
  • 从Channel中获取数据,并传递到下一个Agent或HDFS中
Event

一个数据单元,消息头和消息体组成。(Events可以是日志记录、 avro 对象等。)

  • 在Agent内部传输的数据单元
Interceptor

拦截器,可以根据业务需求进行拦截

  • Source和Channel之间的拦截器
多Agent间数据传输 原理
  • 上一个Agent收集到的数据传入下一个Agent中
  • 上游的sink定义发送端接口,下游Source端定义监听的端口,端口信息保持一致,实现信息的发送
优点
  • Channel可以缓冲数据的收集和写入,保持数据传输的平稳
  • Flume的管道是基于事务,保证了数据在传送和接收时的一致性.
  • 容错性高可扩展性强,支持自定义
  • 支持多路径流量,多管道接入流量,多管道接出流量
Flume事务处理 put事务
  • 将批数据先写入临时缓冲区
  • 检查channel内存队列是否足够合并
  • 内存队列空间不足,回滚数据
fetch事务
  • 将数据取到临时缓冲区,并将数据发送到HDFS上
  • 如果数据全部发送成功,则清除临时数据缓冲群
  • 如果发送过程出现异常,rollback将临沭缓冲区中的数据归还给channel内存队列
组件监听类型 Source
  • netcat Source

    • 监听绑定端口输入的内容
  • Avro Source

    • 监听上一个Agent传输过来的数据
  • Exec Source

    • 监听指令之后后产生的内容
  • Spooling Directory Source

    • 监听文件夹中初次创建文件时的信息
  • talldir Source

    • 监听指定文件夹中,模糊匹配的文件的内容
  • Interceptor

    • 拦截器,可以加工传输的数据设置时间戳、KV值、IP等
sink
  • hdfs

    • 将数据转发到HDFS
  • avro

    • 转发到下一个Agent
  • logger

    • 将日志输出到前台
使用案例 文件格式

Source:talldir Source(监听文件信息)

Channel:基于文件存储

sink:logger(打印到前台)

定义demo.conf文件
vim demo.conf
# example.conf: 一个单节点的 Flume 实例配置
# 配置Agent a1各个组件的名称
a1.sources = r1
a1.sinks = k1    
a1.channels = c1 


# 把sink绑定到channel上
a1.sources.r1.channels = c1
# 配置Agent a1的source r1的属性
a1.sources.r1.type = TAILDIR
#定义文件组
a1.sources.r1.filegroups = f1 f2
#定义各文件组监听文件的路径对应文件的格式
a1.sources.r1.filegroups.f1 = /opt/test01/es.*
a1.sources.r1.filegroups.f2 = /opt/test02/cs.*


# 配置Agent a1的channel c1的属性,channel是用来缓冲Event数据的
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 100


# 把sink绑定到channel上
a1.sinks.k1.channel = c1
# 配置Agent a1的sink k1的属性,打印到前台页面
a1.sinks.k1.type = logger

##使用测试
##新建文件(内容自定义)
vim /opt/test01/es01.txt
vim /opt/test01/es02.txt
vim /opt/test01/cs01.txt
vim /opt/test01/cs02.txt

##进入flume中的bin目录
cd /opt/yjx/apache-flume-1.9.0-bin
##启动程序
bin/flume-ng agent -n a1 -c conf/ -f /opt/flume1.conf -Dflume.root.logger=INFO,console
##启动后会读取文件信息打印到前台
转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/1036689.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号