登录    关于
马育民老师的博客

马育民的博客

QQ:65242847

flume1.9.0 Taildir Source 多日志文件监控,打印到控制台

说明

监控一个目录的文件,当文件发生变化是,将内容打印到控制台中

Taildir Source

通过 Taildir Source 实现此功能

Taildir Source 监控指定的一些文件,并在检测到新的一行数据产生的时候几乎实时地读取它们,如果新的一行数据还没写完,Taildir Source会等到这行写完后再读取。

Taildir Source是可靠的,即使发生文件轮换(译者注1)也不会丢失数据。它会定期地以JSON格式在一个专门用于定位的文件上记录每个文件的最后读取位置。如果Flume由于某种原因停止或挂掉,它可以从文件的标记位置重新开始读取。

Taildir Source还可以从任意指定的位置开始读取文件。默认情况下,它将从每个文件的第一行开始读取。

文件按照修改时间的顺序来读取。修改时间最早的文件将最先被读取(简单记成:先来先走)。

Taildir Source不重命名、删除或修改它监控的文件。当前不支持读取二进制文件。只能逐行读取文本文件。

官方文档

http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#taildir-source

参数解释

  • channels:与Source绑定的channel,多个用空格分开

  • type:组件类型,这个是: TAILDIR.

  • filegroups:被监控的文件夹目录集合,这些文件夹下的文件都会被监控,多个用空格分隔

  • filegroups.<filegroupName>:被监控文件夹的 绝对路径。支持:正则表达式(注意不会匹配文件系统的目录)只是用来匹配文件名

  • positionFile:默认值 ~/.flume/taildir_position.json 用来设定一个记录每个文件的绝对路径和最近一次读取位置inode的文件,这个文件是JSON格式。

创建测试目录

登录 hadoop1 ,创建以下目录,用于测试:

/test_flume
/test_flume/logs/

编写配置文件

登录 hadoop1

cd /program/apache-flume-1.9.0-bin/conf
vim my.conf

内容如下:

# 定义agent名字为a1,相当于变量名,可任意起
# 设置 sources、sinks、channels的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source类型为taildir
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2

# 监控该文件
a1.sources.r1.filegroups.f1 = /test_flume/1.log
# 监控该目录下所有 .log 后缀的文件
a1.sources.r1.filegroups.f2 = /test_flume/logs/*.log

# 配置 sink 为打印到控制台中
a1.sinks.k1.type = logger


# 配置 channel 类型为内存
a1.channels.c1.type = memory

# 内存队列最大为1000,可以存储1000个event
a1.channels.c1.capacity = 1000

# 一个事务中从source接收的events数量或发送给sink的events数量最大为100
a1.channels.c1.transactionCapacity = 100

# 将source 和 sink 绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动

进入 flume 根目录:

cd /program/apache-flume-1.9.0-bin/

正常启动

执行下面命令(必须在上面目录中才能执行):

bin/flume-ng agent -n a1 -c conf -f conf/my.conf -Dflume.root.logger=INFO,console

解释:

  • agent:运行一个 agent
  • -c conf:指定配置目录
  • -f dir-log.conf:指定要运行的配置文件
  • -n a1:配置文件中 agent 的名字
  • -Dflume.root.logger=INFO,console:将信息打印到控制台中

后台启动

nohup bin/flume-ng agent -n a1 -c conf -f conf/my.conf -Dflume.root.logger=INFO,console &

执行结果

会打印一大堆信息,看到下面字样,表示启动成功:

2021-10-20 22:59:04,284 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentati                                                                                                 on.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter gro                                                                                                 up for type: SOURCE, name: r1: Successfully registered new MBean.
2021-10-20 22:59:04,285 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentati                                                                                                 on.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, n                                                                                                 ame: r1 started

测试: 监控 1.log 文件

修改 /test_flume/1.log 文件:

增加:lucy

控制台会打印:

2021-10-20 23:03:49,447 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6C 75 63 79                                     lucy }

说明 flume 成功监控到 1.log 文件内容的编号

测试:监控 logs 目录

进入 logs 目录

cd /test_flume/logs/

创建文件

touch a.log

flume控制台打印:

2021-10-20 23:21:39,933 (PollableSourceRunner-TaildirSource-r1) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile(ReliableTaildirEventReader.java:290)] Opening file: /test_flume/logs/a.log, inode: 10082285, pos: 0

说明监控到该目录有新建文件

修改文件

向文件添加 java,执行下面命令:

echo java >> a.log

flume控制台打印:

2021-10-20 23:23:24,977 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6A 61 76 61                                     java }

说明监控到该文件内容变化


原文出处:https://malaoshi.top/show_1IX24u2g3aa2.html