flume1.9.0 整合hive3.1.x:sink写入hive表 作者:马育民 • 2021-11-30 19:50 • 阅读:10302 # 说明 **缺点:**需要创建 hive 事务表 # 准备 hive 要启动 mysql hive 要启动 metastore https://www.malaoshi.top/show_1IX2HLjutY5g.html # 创建 flume 配置文件 登录 `hadoop1`, ``` cd /program/flume-1.9.0/conf ``` ``` vim test_hive2.conf ``` 内容如下: ``` # 定义agent名字为a1,相当于变量名,可任意起 # 设置 sources、sinks、channels的名字 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source类型为taildir a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 # 监控该文件 a1.sources.r1.filegroups.f1 = /test_flume/hive_student.log # 配置 sink a1.sinks.k1.type = hive # hive metastore 地址 a1.sinks.k1.hive.metastore = thrift://hadoop1:9083 # 库名 a1.sinks.k1.hive.database = default # 表名 a1.sinks.k1.hive.table = t_student # 分区 #a1.sinks.k1.hive.partition = , # 文件夹配置时间后,此处必须为true # a1.sinks.k1.useLocalTimeStamp = true # 过滤策略 a1.sinks.k1.serializer = DELIMITED # 分割符格式 a1.sinks.k1.serializer.delimiter = "|" a1.sinks.k1.serializer.serdeSeparator = '|' # 表中字段 a1.sinks.k1.serializer.fieldnames = name,age,sex # 滚动策略 a1.sinks.k1.round = true a1.sinks.k1.roundValue = 10 a1.sinks.k1.roundUnit = second a1.sinks.k1.batchSize = 100 #=========================================================== # 文件增加1行记录,就提交事务,这里只是为了演示:方便 select 查询结果 a1.sinks.k1.hive.txnsPerBatchAsk=2 # 配置 channel 类型为内存 a1.channels.c1.type = memory # 内存队列最大为1000,可以存储1000个event a1.channels.c1.capacity = 1000 # 一个事务中从source接收的events数量或发送给sink的events数量最大为100 a1.channels.c1.transactionCapacity = 100 # 将source 和 sink 绑定到channel上 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ``` **解释:** - `a1.sinks.k1.hive.txnsPerBatchAsk=2`:`2` 表示,文件增加1行记录,就提交事务,这里只是为了演示:方便 select 查询结果 # hive 创建表 官方推荐,创建分桶事务表,否则 flume 执行报错,错误如下: [![](/upload/0/0/1IX4SCeM4Unf.png)](/upload/0/0/1IX4SCeM4Unf.png) 亲测:仅创建事务表也可以 ### 注意 使用 Hiveserver2 不要使用 spark thrift服务,目前spark底层不支持spark 访问hive事务表,只能获取表结构,不能获取数据 ### 开启事务 必须执行下面2句,开启并发、设置事务管理类,否则建表报错 ``` # 开启 hive 并发 set hive.support.concurrency = true; ``` ``` # 配置事务管理类,默认 set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; ``` **注意:**在 dbeaver 中要一句一句的执行 ### 创建事务表 创建事务表: ``` CREATE TABLE t_student( name string, age string, sex string ) STORED AS ORC TBLPROPERTIES ('transactional' = 'true'); ``` ### 创建分桶事务表 或者创建分桶事务表 ``` CREATE TABLE t_student( name string, age string, sex string ) clustered by (id) into 8 buckets stored as orc TBLPROPERTIES ('transactional'='true'); ``` # 复制 jar 包 hive根目录下的 `/hcatalog/share/hcatalog` 文件夹中的 jar,复制到flume的 `lib` 目录下: ``` cp /program/hive-3.1.2/hcatalog/share/hcatalog/* /program/flume-1.9.0/lib/ ``` # 启动flume 在 `flume-1.9.0` 目录下,执行下面命令: ``` bin/flume-ng agent -n a1 -c conf -f conf/test_hive2.conf -Dflume.root.logger=INFO,console ``` # 测试 执行下面命令,写入文件数据: ``` echo "lili|20|female" >> /test_flume/hive_student.log ``` **注意:** - `"lili|20|female"`:必须加双引号,否则报错 - 用 `|` 作为分隔符 ### 查询 在会话中要先执行下面代码,否则 执行 `select` 时报错 ``` # 开启 hive 并发 set hive.support.concurrency = true; # 配置事务管理类,默认 set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; ``` 查询表: ``` select * from t_student ``` 相关文章: https://zhuanlan.zhihu.com/p/62904645 https://blog.csdn.net/olive18/article/details/85306445 https://www.cnblogs.com/zuizui1204/p/7800226.html 原文出处:http://malaoshi.top/show_1IX2KG8e1dKR.html