hadoop3.x mapreduce案例:统计每个用户包含指定关键字的记录(1)(本机运行) 作者:马育民 • 2025-12-05 12:56 • 阅读:10008 # 介绍 数据如下: [](https://www.malaoshi.top/upload/0/0/1GW2LmseDZFu.png) 详细说明、下载见 [链接](https://www.malaoshi.top/show_1GW2LniEKBE6.html "链接") ### 过滤要求 统计的是每个用户,搜索 `电影` 的记录: 1. 根据 `tab` 制表符拆分每行数据,结果是数组 2. 如果数组有 `6` 个元素,且第 `3` 个元素中包含 `电影`,就符合过滤要求 3. 将符合过滤要求的 第 `2` 个元素 和 第 `3` 个元素记录到结果文件中 **注意:**数据文件第 `2` 列是 用户`UID`,同一个用户,每搜索一次 `电影`,就会存在一行记录,所以会存在重复的 `UID` # 工程 在 [hadoop3.x mapreduce 实现wordcount(本机运行)](https://www.malaoshi.top/show_1IXbjGltYlE.html "hadoop3.x mapreduce 实现wordcount(本机运行)") 工程进行操作 # java代码 ### mapper类 ``` package top.malaoshi.filter2; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 继承自 Mapper 类,用于过滤和处理包含特定关键字的数据。 * 泛型解释: * LongWritable:每一行开始位置的偏移量,对应 map() 方法第1个形参类型 * Text:输入数据的值,通常是一行文本数据,对应 map() 方法第2个形参类型 * Text:context写入的key的数据类型 * Text:context写入的value的数据类型 */ public class FilterMapper extends Mapper { // hadoop 实现序列化好处之二:复用同一个对象 private Text k = new Text(); private Text v = new Text(); /** * map 方法是MapReduce的核心部分之一,它接收输入的数据并进行处理。 * * @param key 是每一行开始位置的偏移量 * @param value 输入数据的值,通常是一行文本数据。 * @param context 上下文对象,用于写入中间键值对。 */ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 将输入的值转换为字符串,并以制表符分割成数组 // 必须通过下面方式转成字符串,否则可能中文乱码 String newValue = new String(value.getBytes(),"utf-8"); String[] str = newValue.split("\t"); // 检查分割后的数组长度是否为6 if (str != null && str.length == 6) { // 提取关键词和用户ID String uid = str[1]; // 用户ID位于第二个位置 String keyword = str[2]; // 关键词位于第三个位置 // 使用正则表达式检查关键词是否包含数字 if (keyword.contains("电影")) { // 如果关键词包含电影 // 输出作为中间键值对 k.set(uid); v.set(keyword); context.write(k, v); } } } } ``` ### reduce类 ``` package top.malaoshi.filter2; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * 继承自 Reducer 类 * 泛型解释: * Text:Mapper 输出的key类型,也对应 reduce() 方法第1个形参类型 * Text:Mapper 输出的 value 类型,对应 reduce() 方法第2个形参类型 * Text:context写入的key的数据类型 * Text:context写入的value的数据类型 */ public class FilterReduce extends Reducer { Text v = new Text(); @Override protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException { StringBuilder stringBuilder = new StringBuilder(); for(Text item:values){ // 注意:这里用下面方式转成字符串,否则数据错误 String newValue = new String(item.getBytes(),0,item.getLength(),"utf-8"); stringBuilder.append(newValue); stringBuilder.append(","); } int len = stringBuilder.length(); stringBuilder.delete(len - 1, len ); v.set(stringBuilder.toString()); context.write(key,v); } } ``` ### 主启动类-driver ``` package top.malaoshi.filter2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class Main { public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { // 检查命令行参数,确保提供了输入和输出路径 if (args != null && args.length == 2) { // 创建Hadoop Configuration对象,用于加载配置信息 Configuration conf = new Configuration(); // 创建一个Job实例,指定任务名"UIDFilter" Job job = new Job(conf, "统计每个用户包含指定关键字的记录"); // 设置主类,以便在执行时能够找到该类。这允许Hadoop以jar包形式运行此程序 job.setJarByClass(Main.class); // 设置Map阶段使用的Mapper类 job.setMapperClass(FilterMapper.class); // 设置Reduce阶段使用的reduce类 job.setReducerClass(FilterReduce.class); // 设置 Mapper 输出key-value对的数据类型,必须与 Mapper 泛型一致 // 与 job.setOutputKeyClass()、job.setOutputValueClass() 一致时可略 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 设置输出key-value对的数据类型,必须与 Reducer 泛型一致 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 添加输入路径到job中 FileInputFormat.addInputPath(job, new Path(args[0])); // 设置输出路径到job中,注意:该目录不应存在 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 提交job,并等待完成。根据job是否成功完成返回状态码 System.exit(job.waitForCompletion(true) ? 0 : 1); } else { // 如果没有提供正确的参数,则打印错误信息 System.err.println(" wordcount "); } } } ``` # 本地调试 1. 在开发机器上运行,如:windows 2. 读取的数据 和 输出结果 在 **本地** 上 **注意:**需要搭建开发环境 [hadoop3.x HDFS JAVA开发介绍](https://www.malaoshi.top/show_1IXaEESC2g9.html "hadoop3.x HDFS JAVA开发介绍") # 准备数据 数据下载见 [链接](https://www.malaoshi.top/show_1GW2LniEKBE6.html "链接") 下载后,将数据文件放在 `D:\bigdata_std\mapreduce\wordcount\wordcount\data_filter` 目录 # idea 操作 ### 设置启动参数 在主启动类点右键,如下操作: [](https://www.malaoshi.top/upload/pic/hadoop/QQ20210217111512.png) [](https://www.malaoshi.top/upload/0/0/1GW2LlH3Z36p.png) 红框处是:`D:\bigdata_std\mapreduce\wordcount\wordcount\data_filter D:\bigdata_std\mapreduce\wordcount\wordcount\result_filter ` **解释:** 中间有空格,这是2个参数 - 第一个参数:读取数据所在的目录 - 第二个参数:生成结果的目录,执行前,`result_filter` 目录必须删除,否则报错 ### 运行 然后 `run` 或 `debug` 运行 ### 执行结果 [](https://www.malaoshi.top/upload/0/0/1GW2LlMVND17.png) 内容如下: [](https://www.malaoshi.top/upload/0/0/1GW2LprUolpH.png) 原文出处:http://malaoshi.top/show_1GW2LnzoRfly.html