hadoop3.x MapReduce Combiner案例 作者:马育民 • 2021-11-08 13:21 • 阅读:10216 上接:[hadoop3.x MapReduce Combiner作用](https://www.malaoshi.top/show_1IX2Baovg8xf.html "hadoop3.x MapReduce Combiner作用") # 案例 本案例是在 wordcount 基础之上增加内容 # 增加 MyCombiner 增加 `MyCombiner` ,该类继承 `Reducer`,注意泛型,这里的 泛型与 `MyReducer` 完全相同 ``` import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * 相当于在Mapper实现了reduce方法 * 所以与Reducer写法一致 * */ public class MyCombiner extends Reducer { private IntWritable value=new IntWritable(); @Override protected void reduce(Text arg0, Iterable arg1, Reducer.Context arg2) throws IOException, InterruptedException { int count=0; for(IntWritable item : arg1){ count+=item.get(); } value.set(count); arg2.write(arg0, value); } } ``` ### Combiner VS reducer 在本例中,`MyCombiner` 代码与 `MyReducer` 代码完全相同 对于简单功能来说,2个类的代码几乎是相同的 ### 既然代码相同,可以不设置 MyReducer 吗? 在本例中,`MyCombiner` 代码与 `MyReducer` 代码完全相同,但必须设置 `MyReducer`,当原文件非常大时,分成几个 **切片**,`MyCombiner` 处理的是 **某一个切片**,最后需要由 `MyReducer` 汇总统计 # 修改 Main2 设置Combiner ``` job.setCombinerClass(MyCombiner.class); ``` ##### 完整代码 ``` import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Main2 extends Configured implements Tool{ public static void main(String[] args) throws Exception { Configuration config = new Configuration(); //通过ToolRunner类 执行 覆盖的run()方法 int res=ToolRunner.run(config, new Main2(), args); //退出客户端 System.exit(res); } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf(),"mywordcount2"); //设置jar加载路径,不加此代码有时可能会报错 job.setJarByClass(Main2.class); //设置map和reduce类 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); //设置map输出 key/value 类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置最终输出 key/value 类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置Combiner job.setCombinerClass(MyCombiner.class); //根据输入参数 设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //提交 // job.submit(); boolean result = job.waitForCompletion(true); return result?1:0; } } ``` # 执行结果 ### 打印内容 [![](https://www.malaoshi.top/upload/pic/hadoop/Snipaste_2021-11-08_13-34-53.png)](https://www.malaoshi.top/upload/pic/hadoop/Snipaste_2021-11-08_13-34-53.png) **解释:** 说明执行了combine,combine输入5条记录,输出2条记录 如果是大文件,效果更明显 ### 生成结果文件 生成结果文件与之前相同 # 测试:不执行 reduce 修改 `Main2` 的 `run()`,增加下面代码,让其不执行 reduce ``` job.setNumReduceTasks(0); ``` ### 打印内容 打印如下: [![](https://www.malaoshi.top/upload/pic/hadoop/Snipaste_2021-11-08_14-12-35.png)](https://www.malaoshi.top/upload/pic/hadoop/Snipaste_2021-11-08_14-12-35.png) 只打印 map 相关信息,是因为只执行了 map task ### 生成结果文件 生成文件名字是:`part-m-00000`,文件名中间是 `m`,因为只执行了 map task 内容如下: ``` 李雷 1 lucy 1 lucy 1 李雷 1 lucy 1 ``` 说明如果不执行 `reduce task`,也就不会执行 `shuffle`,也就不会执行 `Combiner` 原文出处:http://malaoshi.top/show_1IX2Boshfl2W.html