hadoop3.x mapreduce案例:自定义分区-根据省份拆分数据(一)本机运行 作者:马育民 • 2025-12-06 09:21 • 阅读:10004 # 案例说明 电商平台订单数据太多,如下: ``` 订单ID,用户ID,金额,省份 001,1001,89.5,北京 002,1002,199.9,上海 003,1003,299.8,广州 004,1004,59.9,深圳 005,1005,399.7,杭州 006,1006,499.6,成都 007,1007,69.8,南京 ``` 将 **订单数据按省份分区** ,便于后面做统计分析。 **核心目标**:通过 **自定义分区** 精准控制 Map 输出数据的分发规则,让 **不同省份** 的订单数据定向流转到 **指定的 Reduce 任务** 处理,将数据输出到不同的文件中 # 工程 在 [hadoop3.x mapreduce 实现wordcount(本机运行)](https://www.malaoshi.top/show_1IXbjGltYlE.html "hadoop3.x mapreduce 实现wordcount(本机运行)") 工程进行操作 # java代码 ### mapper类 ``` package top.malaoshi.order; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 继承自 Mapper 类 * 泛型解释: * LongWritable:每一行开始位置的偏移量,对应 map() 方法第1个形成类型 * Text:输入数据的值,通常是一行文本数据,对应 map() 方法第2个形成类型 * Text:context写入的key的数据类型 * Text:context写入的value的数据类型 */ public class OrderMapper extends Mapper { // hadoop 实现序列化好处:复用同一个对象 private Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 必须通过下面方式转成字符串,否则可能中文乱码 String newValue = new String(value.getBytes(),0,value.getLength(),"utf-8"); // 以“,”分割成数组 String[] arr = newValue.split(","); // 检查分割后的数组是否符合要求 if(arr!=null && arr.length == 4) { // 将省份作为key k.set(arr[3]); } context.write(k,value); } } ``` ### 自定义分区类 ``` package top.malaoshi.order; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; import java.io.UnsupportedEncodingException; /** * 继承 Partitioner 抽象类 * 泛型解释: * Text:Mapper 输出的 key 的数据类型 * Text:Mapper 输出的 value 的数据类型 */ public class MyPartition extends Partitioner { // 省份数组 private String[] provinceArray = { "北京市", "上海市", "广东省", "浙江省", "四川省", "江苏省", "湖北省", "重庆市", "陕西省" }; @Override public int getPartition(Text key, Text value, int numPartitions) { try { // 必须通过下面方式转成字符串,否则可能中文乱码 String newKey = new String(key.getBytes(),0,key.getLength(),"utf-8"); // 将省份与数组的每个元素做比较,如果相等,就返回 索引+1。这里不要使用0号文件。匹配不上的省份,统一放到0号文件 for(int i = 0,count = provinceArray.length;i { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for(Text item : values){ context.write(NullWritable.get(),item); } } } ``` ### 主启动类-driver ``` package top.malaoshi.order; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import top.malaoshi.uid.UidMapper; import top.malaoshi.uid.UidReducer; import java.io.IOException; public class Main { public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { // 检查命令行参数,确保提供了输入和输出路径 if (args.length != 2 || args == null){ // 检查命令行参数是否正确(需要两个参数) System.err.println("please input current Path"); // 提示用户输入路径 System.exit(1); // 参数不正确时退出程序 } // 创建Hadoop Configuration对象,用于加载配置信息 Configuration conf = new Configuration(); // 创建一个Job实例,指定任务名 Job job = new Job(conf, "订单按省份拆分"); // 设置主类,以便在执行时能够找到该类。这允许Hadoop以jar包形式运行此程序 job.setJarByClass(Main.class); // 设置Map阶段使用的Mapper类 job.setMapperClass(OrderMapper.class); // 设置Reduce阶段使用的reduce类 job.setReducerClass(OrderReducer.class); // 设置 Mapper 输出key-value对的数据类型 // 与 job.setOutputKeyClass()、job.setOutputValueClass() 一致时可略 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 设置输出key-value对的数据类型,必须与 Reducer 泛型一致 job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); // 设置自定义分区类 job.setPartitionerClass(MyPartition.class); // 设置 reduce 数量,n个数量,对应生成 n 个文件,所以数量不能少于省份的数量。不设置分区无效 job.setNumReduceTasks(10); // 添加输入路径到job中 FileInputFormat.addInputPath(job, new Path(args[0])); // 设置输出路径到job中,注意:该目录不应存在 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 提交job,并等待完成。根据job是否成功完成返回状态码 boolean result = job.waitForCompletion(true); System.exit( result ? 0 : 1); } } ``` # 本地调试 1. 在开发机器上运行,如:windows 2. 读取的数据 和 输出结果 在 **本地** 上 **注意:**需要搭建开发环境 [hadoop3.x HDFS JAVA开发介绍](https://www.malaoshi.top/show_1IXaEESC2g9.html "hadoop3.x HDFS JAVA开发介绍") # 准备数据 数据下载见 [链接](https://www.malaoshi.top/show_1GW2M9gj63uR.html "链接") 下载后,将数据文件放在 `D:\bigdata_std\mapreduce\wordcount\wordcount\data2` 目录 # idea 操作 ### 设置启动参数 在主启动类点右键,如下操作: [](https://www.malaoshi.top/upload/pic/hadoop/QQ20210217111512.png) [](https://www.malaoshi.top/upload/0/0/1GW2M9jo71Bh.png) 红框处是:`D:\bigdata_std\mapreduce\wordcount\wordcount\data2 D:\bigdata_std\mapreduce\wordcount\wordcount\result_order` **解释:** 中间有空格,这是2个参数 - 第一个参数:读取数据所在的目录 - 第二个参数:生成结果的目录,执行前,`result_filter` 目录必须删除,否则报错 ### 运行 然后 `run` 或 `debug` 运行 ### 执行结果 [](https://www.malaoshi.top/upload/0/0/1GW2M9kc7wTl.png) 如果省份都能匹配上,`0` 号文件为空 `1` 号文件都是北京的订单数据: [](https://www.malaoshi.top/upload/0/0/1GW2M9l6gSvq.png) 原文出处:http://malaoshi.top/show_1GW2M9mMouhv.html