hadoop3.x MapReduce-排序 作者:马育民 • 2021-04-11 23:03 • 阅读:10229 # 说明 在 shuffle 阶段有2次排序,在 reduce 阶段有一次排序 MapReduce有 **默认的排序规则**,是根据 `key` 的 [字典顺序](https://www.malaoshi.top/show_1IX1Ke7hkJYn.html "字典顺序"),对数据进行排序,在 reduce 阶段中,对 `key` 排序后,才能实现分组 # 案例 充电站数据: ``` 1,100,2020-01-01 09:30:50,2020-01-01 09:40:50 2,200,2020-01-11 10:40:50,2020-01-11 11:05:50 3,150,2020-01-01 11:20:50,2020-01-01 11:32:50 1,300,2020-01-03 14:30:50,2020-01-03 15:20:10 1,50,2020-02-02 12:30:50,2020-02-02 12:35:50 2,500,2020-02-02 15:30:30,2020-02-02 17:20:30 3,150,2020-02-01 11:20:50,2020-02-01 11:32:50 3,150,2020-02-03 11:20:50,2020-02-03 11:32:50 1,100,2020-03-01 09:30:50,2020-03-01 09:40:50 1,100,2020-03-05 09:30:50,2020-03-05 09:40:50 2,500,2020-03-12 15:30:30,2020-03-12 17:30:30 2,500,2020-03-22 15:30:30,2020-03-22 17:30:30 3,400,2020-03-02 17:30:20,2020-03-02 18:20:20 4,100,2020-04-01 11:20:50,2020-04-01 11:25:50 4,50,2020-04-21 11:20:50,2020-04-21 11:28:50 5,150,2020-04-11 11:20:50,2020-04-11 11:32:50 6,150,2020-04-15 11:20:50,2020-04-15 11:40:50 ``` 统计各个充电桩的 充电量、使用次数,按照充电量 **由大到小排序** # 实现方式 ### key 由于 MapReduce对 `key` 进行排序,所以: - 将要排序的 **列** 作为 `key` - 将数据封装成 `bean`,然后将 `bean` 作为 `key` ### 实现 WritableComparable 接口 - 实现 `WritableComparable` 接口,该接口继承(`Writable` 和 `Comparable`) # 实现过程 统计和排序无法在一次 MapReduce 中完成 所以 先执行 [hadoop3.x MapReduce序列化案例-统计充电桩充电量、充电时长、充电次数](https://www.malaoshi.top/show_1IX2B9zmE0SE.html "hadoop3.x MapReduce序列化案例-统计充电桩充电量、充电时长、充电次数"),结果如下: ``` 1 1,5,650,84 2 2,4,1700,375 3 3,4,850,86 4 4,2,150,13 5 5,1,150,12 6 6,1,150,20 ``` 然后再实现一个 MapReduce ,读取上面的 结果文件,然后排序输出 ### 不要生成多个文件 上面 MapReduce 中,不要生成多个文件,否则读取时,还需要对各个充电桩的 **充电量** 做累加 # java代码 ### 修改 Cdz 类 关于 **排序**,详见:[java Comparable 排序和二次排序](https://www.malaoshi.top/show_1IX2DO38AerI.html "java Comparable 排序和二次排序") ``` package sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; public class Cdz implements WritableComparable { //充电桩编号 private String id; //充电次数 private int num; //用电量 private int eleNum; //充电时长,单位:分钟 private int time; //序列化 @Override public void write(DataOutput out) throws IOException { //写的顺序与读的顺序相同,否则读写错乱 out.writeUTF(id); out.writeInt(num); out.writeInt(eleNum); out.writeInt(time); } //反序列化 @Override public void readFields(DataInput in) throws IOException { //写的顺序与读的顺序必须相同,否则读写错乱 id=in.readUTF(); num=in.readInt(); eleNum=in.readInt(); time=in.readInt(); } //保存文件的格式 @Override public String toString() { return id+","+num+","+eleNum+","+time; } public int getNum() { return num; } public void setNum(int num) { this.num = num; } public String getId() { return id; } public void setId(String id) { this.id = id; } public int getTime() { return time; } public void setTime(int time) { this.time = time; } public int getEleNum() { return eleNum; } public void setEleNum(int eleNum) { this.eleNum = eleNum; } @Override public int compareTo(Cdz o) { int res=0; //如果当前对象的电量 > 另一个对象的电量,那么当前对象就排在前面,返回1 if(this.eleNum > o.eleNum){ res = -1; }else if(this.eleNum < o.eleNum){ res = 1; }else{ res = 0; } return res; } } ``` ### mapper ``` package sort; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MyMapper extends Mapper { private Text idText=new Text(); private Cdz cdz=new Cdz(); @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { //读取一行 String line=value.toString(); //根据 , 拆分 String[] array=line.split("\t"); String[] array2=array[1].split(","); //拆分数据 String id=array2[0]; //充电次数 int num=Integer.parseInt(array2[1]); //充电量 int eleNum=Integer.parseInt(array2[2]); //充电时间 int time=Integer.parseInt(array2[3]); //封装 cdz.setId(id); cdz.setNum(num); cdz.setEleNum(eleNum); cdz.setTime(time); idText.set(id); //写数据 context.write(cdz, idText); } } ``` ### reducer ``` package sort; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyReducer extends Reducer{ @Override protected void reduce(Cdz arg0, Iterable arg1, Reducer.Context arg2) throws IOException, InterruptedException { for(Text item:arg1){ //item是充电桩编号,还是作为key;cdz还是作为value arg2.write(item, arg0); } } } ``` ### driver ``` package sort; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Main2 extends Configured implements Tool{ public static void main(String[] args) throws Exception { Configuration config = new Configuration(); //通过ToolRunner类 执行 覆盖的run()方法 int res=ToolRunner.run(config, new Main2(), args); //退出客户端 System.exit(res); } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf(),"cdz_sort"); //设置jar加载路径,不加此代码有时可能会报错 job.setJarByClass(Main2.class); //设置map和reduce类 job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); //设置map输出 key/value 类型 job.setMapOutputKeyClass(Cdz.class); job.setMapOutputValueClass(Text.class); //设置最终输出 key/value 类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Cdz.class); //根据输入参数 设置输入和输出路径 String os = System.getProperty("os.name").toLowerCase(); String inPath; String outPath; if(os.startsWith("win")){ inPath="D:\\bigdata\\chongdianzhuang\\res_first2"; outPath="D:\\bigdata\\chongdianzhuang\\res_sort"; }else{ inPath=args[0]; outPath=args[1]; } FileInputFormat.setInputPaths(job, new Path(inPath)); FileOutputFormat.setOutputPath(job, new Path(outPath)); //提交 // job.submit(); boolean result = job.waitForCompletion(true); return result?1:0; } } ``` 输出结果如下: ``` 2 2,4,1700,375 3 3,4,850,86 1 1,5,650,84 6 6,1,150,20 5 5,1,150,12 4 4,2,150,13 ``` # yarn执行 执行命令 ``` hadoop jar /program/myjar/chongdianzhuang-0.0.1-SNAPSHOT.jar sort.Main2 /chongdianzhan/res_first /chongdianzhan/res_sort1 ``` **注意:** 参数 `/chongdianzhan/res_first` 是上一次 MapReduce 的输出结果 执行结果如下: ``` 2 2,4,1700,375 3 3,4,850,86 1 1,5,650,84 6 6,1,150,20 5 5,1,150,12 4 4,2,150,13 ``` # 总结 Cdz封装类: - 实现:WritableComparable接口 - 实现:compareTo(),要注意:返回 -1、1 map的输出: - keyout:封装对象 原因:会对key进行排序,shuffle(2次)、reduce阶段(1次) - valueout:充电桩id reduce的输出: - keyout:与原来一样,是充电桩 ID,因为不是要改变结果文件的列结构,只是改变顺序 - valueout:与原来一样,是 Cdz 对象 原文出处:http://malaoshi.top/show_1IX2DPGdgGbm.html