登录    关于
马育民老师的博客

马育民的博客

QQ:65242847

flume1.9.0 监控本地文件,上传到HDFS

说明

监控一个目录的文件,当文件发生变化是,将内容发送到 HDFS

检查 hadoop 环境

Flume 将数据输出到 HDFS,依赖 Hadoop 相关 jar 包

如果 flume 与 hadoop 安装在一台服务器上,集成较为简单

检查 /etc/profile.d/bigdata_env.sh 文件,确认 Hadoop 和 Java 环境变量配置正确

# 配置JAVA_HOME
export JAVA_HOME=/program/jdk1.8.0_202
export PATH=${JAVA_HOME}/bin:$PATH

# 配置 HADOOP_HOME
export HADOOP_HOME=/program/hadoop-3.0.3
export PATH=${HADOOP_HOME}/bin:$PATH
export PATH=${HADOOP_HOME}/sbin:$PATH

Taildir Source

参见:
https://www.malaoshi.top/show_1IX24u2g3aa2.html

HDFS

官方文档:
http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#hdfs-sink

常用参数解释:

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop1:8020/flume/logs/%Y%m%d/
# 路径配置时间后,此处必须为true
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 不压缩
a1.sinks.k1.hdfs.fileType = DataStream
# 文件后缀
a1.sinks.k1.hdfs.fileSuffix = .log
# 间隔 30s 后,如果有数据,就新建文件
a1.sinks.k1.hdfs.rollInterval = 60
# 如果写入内容达到120m(要低于hadoop的128m),就新建文件
a1.sinks.k1.hdfs.rollSize = 125829120

创建测试目录

登录 hadoop1 ,创建以下目录,用于测试:

/test_flume

编写配置文件

登录 hadoop1

cd /program/apache-flume-1.9.0-bin/conf
vim my_hdfs.conf

内容如下:

# 定义agent名字为a1,相当于变量名,可任意起
# 设置 sources、sinks、channels的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source类型为taildir
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2

# 监控该文件
a1.sources.r1.filegroups.f1 = /test_flume/1.log
# 监控该目录下所有 .log 后缀的文件
a1.sources.r1.filegroups.f2 = /test_flume/logs/.*log

# 配置 sink 
a1.sinks.k1.type = hdfs
# 文件夹有年月日组成
a1.sinks.k1.hdfs.path = hdfs://hadoop1:8020/flume/logs/%Y%m%d/
# 文件夹配置时间后,此处必须为true
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 不压缩
a1.sinks.k1.hdfs.fileType = DataStream
# 文件后缀
a1.sinks.k1.hdfs.fileSuffix = .log
# 间隔 60s 后,如果有数据,就新建文件
a1.sinks.k1.hdfs.rollInterval = 60
# 如果写入内容达到120m(要低于hadoop的128m),就新建文件
a1.sinks.k1.hdfs.rollSize = 125829120


# 配置 channel 类型为内存
a1.channels.c1.type = memory

# 内存队列最大为1000,可以存储1000个event
a1.channels.c1.capacity = 1000

# 一个事务中从source接收的events数量或发送给sink的events数量最大为100
a1.channels.c1.transactionCapacity = 100

# 将source 和 sink 绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动

进入 flume 根目录:

cd /program/apache-flume-1.9.0-bin/

执行下面命令(必须在上面目录中才能执行):

bin/flume-ng agent -n a1 -c conf -f conf/my_hdfs.conf -Dflume.root.logger=INFO,console

解释:
https://www.malaoshi.top/show_1IX258AKgb2F.html

执行结果

会打印一大堆信息,看到下面字样,表示启动成功:

2021-10-20 22:59:04,284 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentati                                                                                                 on.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter gro                                                                                                 up for type: SOURCE, name: r1: Successfully registered new MBean.
2021-10-20 22:59:04,285 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentati                                                                                                 on.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, n                                                                                                 ame: r1 started

测试: 监控 1.log 文件

修改 /test_flume/1.log 文件:

增加:lucy

控制台会打印:

2021-10-20 23:03:49,447 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6C 75 63 79                                     lucy }

说明 flume 成功监控到 1.log 文件内容的编号


原文出处:https://malaoshi.top/show_1IX258Tc7AHi.html