资源预览内容
第1页 / 共39页
第2页 / 共39页
第3页 / 共39页
第4页 / 共39页
第5页 / 共39页
第6页 / 共39页
第7页 / 共39页
第8页 / 共39页
第9页 / 共39页
第10页 / 共39页
亲,该文档总共39页,到这儿已超出免费预览范围,如果喜欢就下载吧!
资源描述
Spark Transformation转换算子Spark Transformation转换算子一.Value类型1.map():映射2.mapPartitions():以分区为单位执行map *map()和mapPartitions()区别3.mapPartitionsWithIndex():带分区号的24.flatMap():扁平化5.glom():分区转数组6.groupBy():分组 WordCount案例7.filter():过滤8.sample():随机抽样9.distinct():去重10.coalesce():重新分区 1)不执行Shuffle方式默认 2)执行Shuffle方式 3)Shuffle原理11.repartition():重新分区 *coalesce和repartition区别12.sortBy():排序13.pipe():调用脚本二.双Value类型 1.union()并集 2.subtract ()差集 3.intersection()交集 4.zip()拉链三.Key-Value类型1.partitionBy():按照key重新分区2.reduceByKey():按照K聚合V3.groupByKey():按照K重新分组 *reduceByKey和groupByKey区别4.aggregateByKey():按照K处理分区内和分区间逻辑5.foldByKey():分区内和分区间相同的aggregateByKey()6.combineByKey():转换结构后分区内和分区间操作7.几种聚合算子的对比8.sortByKey():按照K进行排序9.mapValues():只对V进行操作10.join():连接,将相同key对应的多个value关联在一起11.cogroup():类似全连接,但是在同一个RDD中对key聚合RDD转换算子整体上分为:Value类型、双Value类型和Key-Value类型一.Value类型顾名思义是对单个value值进行运算的算子类型。下面主要从函数签名、功能、案例+图解三个方法介绍这几类算子。1.map():映射1)函数签名:def mapU: ClassTag(f: T = U): RDDU2)功能说明参数f是一个函数,它可以接收一个参数。当某个RDD执行map方法时,会遍历该RDD中的每一个数据项,并依次应用f函数,从而产生一个新的RDD。即,这个新RDD中的每一个元素都是原来RDD中每一个元素依次应用f函数而得到的。map方法的部分源代码:def mapU: ClassTag(f:T= U): RDDU= withScope val clea nF= sc. clea n(f) new MapPartitionsRDDU, T(this, (context, pid, iter) = iter.m ap(cleanF)privatespark class MapPartitionsRDDU: ClassTag T: ClassTag( . override def getPartitions: ArrayPartition= firstParentT.partit ions, .protectedspark def firstParentU; ClassTag RDDU= dependencies. head.rdd. asInst anceOfRDDU3)案例:创建一个1-4数组的RDD,两个分区,将所有元素*2形成新的RDDobject TestMap def main(args: ArrayString): Unit = /1.创建SparkConf并设置App名称 val conf = new SparkConf().setAppName(CT).setMaster(local*) /2.创建SparkContext,该对象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) /3具体业务逻辑 / 3.1 创建一个RDD val rdd: RDDInt = sc.makeRDD(1 to 4,2) / 3.2 调用map方法,每个元素乘以2 val mapRdd: RDDInt = rdd.map(_ * 2) / 3.3 打印修改后的RDD中数据 mapRdd.collect().foreach(println) /4.关闭连接 sc.stop() 说明:由于具体的业务逻辑代码只有第3部分几行,剩下的关于SparkContext对象的创建及资源关闭,后面的案例将直接省略这部分代码。2.mapPartitions():以分区为单位执行mapmapPartitions:以分区为单位,对RDD中的元素进行映射。一般适用于批处理的操作,比如:将RDD中的元素插入到数据库中,需要数据库连接,如果每一个元素都创建一个连接,效率很低;可以对每个分区的元素,创建一个连接。1)函数签名:def mapPartitionsU: ClassTag(f: IteratorT = IteratorU, preservesPartitioning: Boolean = false): RDDU) 参数说明: f函数:把每一个分区的数据,分别放入到迭代器中,进行批处理。传入的是一个迭代器,也就是该分区的整体数据。 preservePartitioning: 是否保留上游RDD的分区信息,默认flase2)功能说明: Map是一次处理一个元素,而mapPartitions一次处理一个分区数据3)案例:创建个RDD,4个元素,2个分区,使每个元素*2组成新的RDD/3具体业务逻辑/ 3.1 创建一个RDDval rdd: RDDInt = sc.makeRDD(1 to 4, 2)/ 3.2 调用mapPartitions方法,每个元素乘以2val rdd1 = rdd.mapPartitions(x=x.map(_*2)/注意与map处理数据的区别:/val mapRdd: RDDInt = rdd.map(_ * 2)/ 3.3 打印修改后的RDD中数据rdd1.collect().foreach(println)*map()和mapPartitions()区别mapPartitions一般适用于批处理的操作,比如:将RDD中的元素插入到数据库中,需要数据库选接,如果每一个元素都创建一个连接, 效率很低可以对每个分区的元素,创建一个连接3.mapPartitionsWithIndex():带分区号的2mapPartitionsWithIndex:以分区为单位,对RDD中的元素进行映射,并且带分区编号1)函数签名:def mapPartitionsWithIndexU: ClassTag( f: (Int, IteratorT) = IteratorU, preservesPartitioning: Boolean = false): RDDU)Int即表示分区编号.2)功能说明:类似于mapPartitions,比mapPartitions多一个整数参数表示分区号3)案例:创建一个RDD,使每个元素跟所在分区号形成一个元组,组成一个新的RDD/3具体业务逻辑/ 3.1 创建一个RDDval rdd: RDDInt = sc.makeRDD(1 to 4,2)/ 3.2 调用map方法,每个元素乘以2val mapRdd = rdd.mapPartitionsWithIndex( (index,items) = items.map(a = (index,a) )/ 3.3 打印修改后的RDD中数据mapRdd.collect().foreach(println)4.flatMap():扁平化faltMap:对集合中的元素进行扁平化处理1)函数签名:def flatMapU: ClassTag(f: T = TraversableOnceU): RDDU)2)功能说明 与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中区别: 在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中3)案例:创建一个集合,集合里面存储的还是子集合,把所有子集合中数据取出放入到一个大的集合中/3具体业务逻辑/ 3.1 创建一个RDDval listRDD=sc.makeRDD(List(List(1,2),List(3,4),List(5,6),List(7), 2)/ 3.2 把所有子集合中数据取出放入到一个大的集合中listRDD.flatMap(list=list).collect.foreach(println)注意:如果匿名函数输入和输出相同,那么不能简化 listRDD.flatMap(list=list)5.glom():分区转数组glom:将RDD一个分区中的元素,组合成一个新的数组1)函数签名:def glom(): RDDArrayT2)功能说明 该操作将RDD中每一个分区变成一个数组,并放置在新的RDD中,数组中元素的类型与原分区中元素类型一致3)案例:创建一个2个分区的RDD,并将每个分区的数据放到一个数组,求出每个分区的最大值/ 3.1 创建一个RDDval rdd = sc.makeRDD(1 to 4, 2)/ 3.2 求出每个分区的最大值 0-1,2 1-3,4/glom求出的是Array,map遍历的就是Arrayval maxRdd: RDDInt = rdd.glom().map(_.max)/ 3.3 求出所有分区的最大值的和 2 + 4println(maxRdd.collect().sum)6.groupBy():分组groupBy:按照指定的规则,对RDD中的元素进行分组1)函数签名:def groupByK(f: T = K)(implicit kt: ClassTagK): RDD(K, IterableT)2)功能说明 分组,按照传入函数的返回值进行分组; 将相同的key对应的值放入一个迭代器:如 1,2,3,4 按照奇偶数分组 (0,CompactBuffer(2, 4) (1,CompactBuffer(1, 3)3)案例:创建一个RDD,按照元素模以2的值进行分组。/ 3.1 创建一个RDDval rdd = sc.makeRDD(1 to 4, 2)/
网站客服QQ:2055934822
金锄头文库版权所有
经营许可证:蜀ICP备13022795号 | 川公网安备 51140202000112号