2019独角兽企业重金招聘Python工程师标准>>>
package com.xp.cnimport org.apache.log4j.{Level, Logger}import org.apache.spark.{SparkConf, SparkContext}/** * Created by xupan on 2017/12/15. * Spark操作—aggregate:action * 将每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。 * 这个函数最终返回的类型不需要和RDD中元素类型一致。 */object AggerateDemo { def main(args: Array[String]) { Logger.getLogger(“org.apache.spark”).setLevel(Level.ERROR) //SparkContext:spark执行入口 val sc: SparkContext = new SparkContext( new SparkConf() .setAppName(“WordCount”) .setMaster(“local[4]”) ) /** * 先在每个分区中迭代执行 (x : Int,y : Int) => x + y 并且使用zeroValue的值1 * 即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16 * part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41 * 再将两个分区的结果合并(a : Int,b : Int) => a + b ,并且使用zeroValue的值1 * 即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58 **/ val kk = sc.parallelize(Array(5, 4, 3, 2, 1, 10, 9, 8, 7, 6), 2) val oo = kk.aggregate(1)({ (x: Int, y: Int) => x + y }, { (a: Int, b: Int) => a + b }) println(oo + “oo”) //58 /** * ##这次zeroValue=2 ##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17 ##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42 ##最后:zeroValue*part_0*part_1 = 2 * 17 * 42 = 1428 */ val res = kk.aggregate(2)({ (x: Int, y: Int) => x + y }, { (a: Int, b: Int) => a * b }) println(“res :” + res) /** 1. 0+1, 0+1 2. 1+2, 1+1 3. 3+3, 2+1 4. 6+4, 3+1 5. 10+5, 4+1 …… 实际Spark执行中是分布式计算,可能会把List分成多个分区, 假如3个,p1(1,2,3,4),p2(5,6,7,8),p3(9), 经过计算各分区的的结果(10,4),(26,4),(9,1), 这样,执行(par1,par2) =>(par1._1 + par2._1, par1._2 + par2._2) 就是(10+26+9,4+4+1)即(45,9),再求平均值就简单了。 */ val ll = kk.aggregate((0, 0))( (acc, number) => (acc._1 + number, acc._2 + 1), (par1, par2) => (par1._1 + par2._1, par1._2 + par2._2) ) println(“ll : ” + (ll._1.toDouble / ll._2)) //聚合求,指定2个分区,每个分区对应一个task val rdd1 = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8, 9), 2) //aggregate:聚合,有两个括号的方法,柯立化方法,初始值为0,传递了两个函数 //第一个:每个分区上做聚合 + 1 //第二个:将每个分区的结果再次聚合 + 1 val aggRDD = rdd1.aggregate(1)(_ + _, _ + _) println(” aggRDD : ” + aggRDD) //求每个分区的最大值,再将结果聚合 val result = rdd1.aggregate(1)(math.max(_, _), _ + _) println(” result : ” + result) //14 val rdd2 = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8, 9), 2) val result2 = rdd2.aggregate(0)(math.max(_, _), _ + _) println(” result2 : ” + result2) //18 //第一个参数2也要参与计算,只是最后参加一次 val result3 = rdd2.aggregate(2)(math.max(_, _), _ + _) println(“result3 : ” + result3) //result3 : 20 println(“===============================”) val rddS = sc.parallelize(Array(“a”, “b”, “c”, “d”, “e”), 2) val resultS = rddS.aggregate(“”)(_ + _, _ + _) //结果在变化 resultS : cdeab resultS : abcde //变化的原因是因为并不确定哪个分区先执行完成 println(” resultS : ” + resultS) println(“================================”) //计算每个分区最大的字符串长度,并转换为字符串相加 //rdds2Result : 65 rdds2Result : 56 //结果在变化 val rdds2 = sc.parallelize(Array(“sda”, “bsdfs”, “c”, “d”, “esdfsd”), 2); val rdds2Result = rdds2.aggregate(“”)((x, y) => math.max(x.length, y.length).toString, (x, y) => x + y) println(” rdds2Result : ” + rdds2Result) println(“================================”) val rddw = sc.parallelize(List(“4444”, “11113”), 2) val rddws = rddw.aggregate(“”)((x, y) => math.min(x.length, y.length).toString, (x, y) => x + y) println(” rddws :” + rddws) //关闭资源 sc.stop() }}
转载于:https://my.oschina.net/u/2253438/blog/1590649