flume1.9.0 监控本地文件,保存到hive中-thrift方式 作者:马育民 • 2021-10-23 00:09 • 阅读:10120 # 说明 通过 `Thrift` 连接 hive,将数据保存到 hive中 **需要创建 分桶 事务 表,不推荐,未实现** [Thrift介绍](https://www.malaoshi.top/show_1IX25nWmAXZN.html "Thrift介绍") [hive的thrift 和 hiveserver2介绍](https://www.malaoshi.top/show_1IX25nc4KedX.html "hive的thrift 和 hiveserver2介绍") # 依赖 hive 环境 ### 启动metastore(元数据)服务 https://malaoshi.top/show_1IXjyrb5E3w.html ### hiveserver2 配置 hiveserver2 ,flume 通过该服务,向 hive 保存数据 https://www.malaoshi.top/show_1IXjzG9X6AM.html ### 创建表 用于保存数据 ``` cd /program/apache-hive-3.1.2/bin ``` ``` ./hive ``` ``` CREATE TABLE t_log (datetime string,ip string,msg string) STORED AS ORC TBLPROPERTIES ('transactional' = 'true'); ``` 分桶的 orc 的 事务表 # flume 配置 ``` # 定义agent名字为a1,相当于变量名,可任意起 # 设置 sources、sinks、channels的名字 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source类型为taildir a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 # 监控该文件 a1.sources.r1.filegroups.f1 = /test_flume/1.log # 配置 sink a1.sinks.k1.type = hive # hive metastore 地址 a1.sinks.k1.hive.metastore = thrift://hadoop1:9083 # 库名 a1.sinks.k1.hive.database = default # 表名 a1.sinks.k1.hive.table = t_log # 分区 # a1.sinks.k1.hive.partition = , # 是否使用本地时间 #a1.sinks.k1.useLocalTimeStamp = true # 过滤策略 a1.sinks.k1.serializer = DELIMITED # 分割符格式 a1.sinks.k1.serializer.delimiter = "|" #a1.sinks.k1.serializer.serdeSeparator = '|' # 表中字段 a1.sinks.k1.serializer.fieldnames = datetime,ip,msg a1.sinks.k1.batchSize = 100 # 配置 channel 类型为内存 a1.channels.c1.type = memory # 内存队列最大为1000,可以存储1000个event a1.channels.c1.capacity = 1000 # 一个事务中从source接收的events数量或发送给sink的events数量最大为100 a1.channels.c1.transactionCapacity = 100 # 将source 和 sink 绑定到channel上 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ``` ### 常见错误 ``` [ERROR - org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:469)] Sink k1 has been removed due to an error during configuration java.lang.InstantiationException: Incompatible sink and channel settings defined. sink's batch size is greater than the channels transaction capacity. Sink: k1, batch size = 15000, channel c1, transaction capacity = 100 ``` 原因:channel和sink的设置不匹配,sink的batch size大于channel的transaction capacity 解决:将 `a1.sinks.k1.hdfs.batchSize设置为小于等于100 如下: ``` a1.sinks.k1.batchSize = 100 a1.channels.c1.transactionCapacity = 100 ``` # 依赖 hive 相关的jar包 将 `/program/apache-hive-3.1.2/hcatalog/share/hcatalog/` 目录下的jar包复制到 `/program/apache-flume-1.9.0-bin/lib/` 下: ``` cp /program/apache-hive-3.1.2/hcatalog/share/hcatalog/* /program/apache-flume-1.9.0-bin/lib/ ``` # 启动 进入 flume 根目录: ``` cd /program/apache-flume-1.9.0-bin/ ``` 执行下面命令(必须在上面目录中才能执行): ``` bin/flume-ng agent -n a1 -c conf -f conf/my_hive.conf -Dflume.root.logger=INFO,console ``` **解释:** https://www.malaoshi.top/show_1IX258AKgb2F.html ### 执行结果 会打印一大堆信息,看到下面字样,表示启动成功: ``` 2021-10-20 22:59:04,284 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentati on.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter gro up for type: SOURCE, name: r1: Successfully registered new MBean. 2021-10-20 22:59:04,285 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentati on.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, n ame: r1 started ``` # 测试 ### 监控 1.log 文件 修改 `/test_flume/1.log` 文件: 增加下面内容: ``` 2021-10-01 09:10:40 192.168.1.10 点击iphone13广告 ``` 说明 flume 成功监控到 `1.log` 文件内容的编号 ### 在 hive 中查询表 在 `hive` 命令环境中,查询表: ``` select * from t_log ``` 原文出处:http://malaoshi.top/show_1IX25qvNZG00.html