资源预览内容
第1页 / 共5页
第2页 / 共5页
第3页 / 共5页
第4页 / 共5页
第5页 / 共5页
亲,该文档总共5页全部预览完了,如果喜欢就下载吧!
资源描述
1 基础概念1 spark是什么?RDD是什么?Spark 是一个用来实现快速而通用的集群计算的平台。RDD是弹性分布式数据集,弹性是指数据默认放在内存,当内存放不下了会放到磁盘,RDD的分区数量可以改变,通过repartition可以改变RDD的分区数量。分布式表示数据被分配到了多台机器,可以并行在集群计算。 Application:用户编写的Spark应用程序,由一个或多个Job组成。提交到Spark之后,Spark会为Application分配资源,将程序进行转换并执行。 Job(作业):由Action算子触发生成的由一个或多个Stage组成的计算作业。 Stage(调度阶段):每个Job会根据RDD的宽依赖被切分为多个Stage,每个Stage都包含一个TaskSet。 TaskSet(任务集):一组关联的,但相互之间没有shuffle依赖关系的Task集合。一个TaskSet对应的调度阶段。 Task(任务):RDD中的一个分区对应一个Task,Task是单个分区上最小的处理流程单元。2 RDD的特性?分区:通过分区列表可以找到一个RDD中包含的所有分区及其所在地址。RDD 是由多个 partition(某个节点里某一片连续的数据)组成的 List,将数据加载为 RDD 时,一般一个 hdfs 中的 block 会加载为一个 partition。依赖:记录子RDD对父RDD的依赖,当RDD操作出错时,根据依赖关系重新计算丢失分区的数据。窄依赖重新计算数据利用率是百分百,宽依赖不是百分百。算子操作:转换(transformation)与行动(Action)。Transformation操作是延迟计算的,也就是说从一个RDD转换生成另一个RDD的转换操作不是马上执行,需要等到有Action操作的时候才会真正触发运算。Action算子会触发Spark提交作业(Job),并将数据输出Spark系统。位置优先(可选):也就是数据的本地性,比如 HDFS 的 block 的所在位置应该是优先计算的位置。本地有数据就取本地,没有去远程拉取。分区器(可选):只针对KV类型的RDD。可以传递自定义 Partitioner 重新分区。默认为Hash。3 RDD算子Transformation:返回新 RDD的操作,转化出来的 RDD 是惰性求值的,只有在行动操作中用到这些 RDD 时才会被计算。Map(func):对调用map的RDD中的每个元素都执行func,返回一个新的RDD。Filter(func):对调用filter的RDD中的每个元素都执行func,返回一个包含使func为true的元素组成的RDD。Distinct:对RDD进行去重操作。Union:生成包含两个RDD元素的新RDD。MapValues:针对PairRDD,key不变,对value进行操作,和key组成新的元素。reduceByKey:对key相同的元素的value值执行funct的操作,返回新的RDDgroupByKey:将相同Key的值进行聚集,输出一个(K, IterableV)类型的RDDsortByKey:在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD。Action:Collect:以数组形式返回RDD内的所有元素CollectAsMap:以map的形式返回PairRDD的所有元素Count:返回RDD中元素的个数First:返回RDD中的首个元素。Foreach:逐个处理RDD中的元素。持久化:对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition。这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD。2 什么是宽窄依赖、DAG、stage?窄依赖:父RDD和子RDD的partition之间一对一,或者当父RDD的partition只对应一个子RDD的partition时,父RDD的partition和子RDD partition之间的多对一关系。例如:map、filter、union宽依赖:父RDD的partition和子RDD的partition的多对一关系。例如:reduceByKey、groupByKeyDAG:spark会根据RDD之间的依赖关系,形成一个DAG有向无环图。DAG会提交给DAGschedular,DAGschedular根据宽窄依赖划分stage,每遇到一个宽依赖就划分为一个stage,每个stage包含一个或一组task组成,task以taskset的形式提交给taskschedular。Stage的task并行度是由stage的最后一个RDD的分区数来决定的3 spark资源/任务调度流程1.集群启动之后,worker节点向Master节点汇报资源情况,这样Master就掌握了集群的资源情况。2.当spark提交应用之后,会根据RDD之间的依赖关系将application形成一个DAG有向无环图。任务提交之后,spark会在Driver端创建两个对象,DAGschedular和taskschedular。3.DAGschedular是任务调度的高层调度器,会根据RDD的宽窄依赖划分stage,每个stage以taskSet的形式提交给taskschedular,taskschedular是任务调度的低层调度器。4.taskschedular会遍历taskset集合, 拿到每个task会把每个task发送到计算节点Executor中去执行。5.task在Executor线程池中的运行情况会向TaskScheduler反馈,当task执行失败时,则由TaskScheduler负责重试,将task重新发送给Executor去执行,默认重试3次。如果重试3次依然失败,那么这个task所在的stage就失败了。stage失败了则由DAGScheduler来负责重试,重新发送TaskSet到TaskSchdeuler,Stage默认重试4次。如果重试4次以后依然失败,那么这个job就失败了。job失败了,Application就失败了。因此一个task默认情况下重试3*4=12次。粗粒度资源申请:在application执行之前完成资源申请,资源申请成功之后才会进行任务调度,当所有的task执行完成之后,才会释放这部分资源。优点:在application执行之前资源就已经申请完毕,每个task可以直接使用资源,不需要task在执行前自己去申请资源,这样task启动就快乐,stage就快了,job就快了,application就快了。缺点:直到最后一个task执行完才释放资源,集群资源无法有效利用。细粒度资源申请:在application执行之前不进行资源申请,每个task自己去申请资源,task执行变慢,application变慢,但是可以充分利用集群资源。4 数据倾斜数据倾斜是指shuffle过程中,把各个节点上相同key拉取到某个节点上的一个task来处理,此时,如果某个节点上的key对应的数据量特别大时,就会发生数据倾斜。并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。(磁盘IO和网络数据传输是shuffle性能差的原因。)现象:同一个stage中的相同task,大部分task执行非常快,少数task执行非常慢,慢的耗时是快的23倍;原本正常运行的任务某天突然outofmemory,也可能是数据倾斜。后果(慢/outofmemory):stage的执行时间由最后一个task决定,因此运行缓慢的task会拖慢整个应用的速度。过多的数据在同一个task中执行,将会把executor撑爆。解决办法:1 数据问题(1) 无效的数据过多,比如null,进行过滤(2) 有效数据A) 对数据源进行聚合B) 提高shuffle的操作reduce并行度。C) Key添加随机数实现双重聚合,针对reduceByKey和groupByKeyD) reduce join 转换为map join,一个小的RDD和一个大的RDD进行join操作,把小的RDD的数据广播出去。E) 把产生倾斜的Key单独拉出来。和其他的key进行join操作,把这个key的数据分散到各个task上。F) 小表扩大n倍,大表增加随机数5 spark调优原则:1尽量避免创建重复的RDD,我们在开发spark应用时,首先是基于某个数据源创建一个初始RDD,然后对这个RDD执行某个算子操作,得到下一个RDD,以此类推最后得到结果,这个过程多个RDD经过算子操作串起来形成一个RDD血缘关系链。对于一份数据不应该创建多个RDD。2尽可能的复用RDD,从而尽可能减少算子的执行次数。有一个RDD的数据格式是key-value类型的,另一个是单value类型的,这两个RDD的value数据是完全一样的。那么此时我们可以只使用key-value类型的那个RDD,因为其中已经包含了另一个的数据。3对多次使用的RDD进行持久化。Spark每次对一个算子执行操作时,都会从源头开始计算,因此应该把多次使用的RDD进行持久化,保存到内存或者磁盘,直接读取RDD进行算子操作。4尽量避免使用shuffle类算子。比如reduceByKey,join等。用broadcast+map实现join操作。5 使用map-side预聚合的shuffle操作。用reduceByKey代替groupByKey。在节点本地对相同的Key聚合一次。6 使用高性能的算子。比如用reduceByKey代替groupByKey,mapPartitions 替代普通 map。
收藏 下载该资源
网站客服QQ:2055934822
金锄头文库版权所有
经营许可证:蜀ICP备13022795号 | 川公网安备 51140202000112号