spark3.0教程:RDD行动算子aggregate() 作者:马育民 • 2021-06-27 15:31 • 阅读:10163 # 说明 设置初始值,初始值和分区内的数据进行聚合,得到各个分区的结果;然后将初始值和各个分区的结果(分区间的数据)聚合 **分区内数据操作方式** 和 **分区间数据操作方式** 可以分别指定 执行过程如下: [![](https://www.malaoshi.top/upload/pic/spark/QQ20210627154352.jpg)](https://www.malaoshi.top/upload/pic/spark/QQ20210627154352.jpg) 类似算子:fold() # 声明 ``` def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U ``` **形参:** - zeroValue:初始化值。在比较大小时,需要有初始值 - seqOp:分区内操作 - combOp:分区间操作 **注意:** `aggregate` 后面带有 2个 `()` ,这是 [函数柯里化](https://www.malaoshi.top/show_1IX1BllyYdpN.html "函数柯里化") **返回值:** - 返回计算结果 ### 例子 ``` val sparConf = new SparkConf().setMaster("local[*]").setAppName("RDD") val sc = new SparkContext(sparConf) val rdd:RDD[Int] = sc.makeRDD( Array( 1,2,3,4,5,6 ),2 ) //这里初始值只为了演示效果 val res:Int=rdd.aggregate(10)(_ + _,_ + _) println(res) sc.stop() ``` # 案例 取分区间最大值,然后相加 ### 写法一 ``` val sparConf = new SparkConf().setMaster("local[*]").setAppName("RDD") val sc = new SparkContext(sparConf) val rdd:RDD[Int] = sc.makeRDD( Array( 1,2,3,4,5,6 ),2 ) val res:Int=rdd.aggregate(0)((e1,e2)=>math.max(e1,e2),_ + _) println(res) sc.stop() ``` 执行结果: ``` 9 ``` **解释:** 1. 分区内:初始值 `0` 先与分区内的数据做 **比较**,并返回 **最大值** 2. 分区间:求和 `0+3+6` ### 注意 如果初始值设为 `10`,那么返回是30 **解释:** 1. 分区内:初始值 `10` 先与分区的数据做比较,2个分区的最大值是`10` 2. 分区间:求和 `10(初始值) + 10(第一个分区最大值) + 10(第二个分区最大值)` ### 写法二 ``` val sparConf = new SparkConf().setMaster("local[*]").setAppName("RDD") val sc = new SparkContext(sparConf) val rdd:RDD[Int] = sc.makeRDD( Array( 1,2,3,4,5,6 ),2 ) val res:Int=rdd.aggregate(0)((e1,e2)=> { if(e1>e2){ e1 }else{ e2 } },_ + _) println(res) sc.stop() ``` ### 用return报错 ``` val sparConf = new SparkConf().setMaster("local[*]").setAppName("RDD") val sc = new SparkContext(sparConf) val rdd:RDD[Int] = sc.makeRDD( Array( 1,2,3,4,5,6 ),2 ) val res:Int=rdd.aggregate(0)((e1,e2)=> { if(e1>e2){ return e1//用return报错 }else{ return e2//用return报错 } },_ + _) println(res) sc.stop() ``` 原文出处:http://malaoshi.top/show_1IX1O4vo6Rdy.html