哪个网站可以接做美工的活儿,济南建网站公司价格,网站建设人力资源人员配置,地图网站制作公众号后台回复关键词#xff1a;pyspark#xff0c;获取本项目github地址。Spark程序可以快如闪电⚡️#xff0c;也可以慢如蜗牛?。它的性能取决于用户使用它的方式。一般来说#xff0c;如果有可能#xff0c;用户应当尽可能多地使用SparkSQL以取得更好的性能。主要原… 公众号后台回复关键词pyspark获取本项目github地址。Spark程序可以快如闪电⚡️也可以慢如蜗牛?。它的性能取决于用户使用它的方式。一般来说如果有可能用户应当尽可能多地使用SparkSQL以取得更好的性能。主要原因是SparkSQL是一种声明式编程风格背后的计算引擎会自动做大量的性能优化工作。基于RDD的Spark的性能调优属于坑非常深的领域并且很容易踩到。我们将介绍Spark调优原理Spark任务监控以及Spark调优案例。本文参考了以下文章《Spark性能优化指南——基础篇》https://tech.meituan.com/2016/04/29/spark-tuning-basic.html《Spark性能优化指南——高级篇》https://tech.meituan.com/2016/05/12/spark-tuning-pro.html《spark-调节executor堆外内存》https://www.cnblogs.com/colorchild/p/12175328.htmlimport findspark#指定spark_home为刚才的解压路径,指定python路径spark_home /Users/liangyun/ProgramFiles/spark-3.0.1-bin-hadoop3.2python_path /Users/liangyun/anaconda3/bin/pythonfindspark.init(spark_home,python_path)import pyspark from pyspark.sql import SparkSession#SparkSQL的许多功能封装在SparkSession的方法接口中spark SparkSession.builder \ .appName(test) \ .config(master,local[4]) \ .enableHiveSupport() \ .getOrCreate()sc spark.sparkContext一Spark调优原理 可以用下面三个公式来近似估计spark任务的执行时间。可以用下面二个公式来说明spark在executor上的内存分配。如果程序执行太慢调优的顺序一般如下1首先调整任务并行度并调整partition分区。2尝试定位可能的重复计算并优化之。3尝试定位数据倾斜问题或者计算倾斜问题并优化之。4如果shuffle过程提示堆外内存不足考虑调高堆外内存。5如果发生OOM或者GC耗时过长考虑提高executor-memory或降低executor-core。以下是对上述公式中涉及到的一些概念的初步解读。任务计算总时间假设由一台无限内存的同等CPU配置的单核机器执行该任务所需要的运行时间。通过缓存避免重复计算通过mapPartitions代替map以减少诸如连接数据库预处理广播变量等重复过程都是减少任务计算总时间的例子。shuffle总时间任务因为reduceByKeyjoinsortBy等shuffle类算子会触发shuffle操作产生的磁盘读写和网络传输的总时间。shuffle操作的目的是将分布在集群中多个节点上的同一个key的数据拉取到同一个节点上以便让一个节点对同一个key的所有数据进行统一处理。shuffle过程首先是前一个stage的一个shuffle write即写磁盘过程中间是一个网络传输过程然后是后一个stage的一个shuffle read即读磁盘过程。shuffle过程既包括磁盘读写又包括网络传输非常耗时。因此如有可能应当避免使用shuffle类算子。例如用mapbroadcast的方式代替join过程。退而求其次也可以在shuffle之前对相同key的数据进行归并减少shuffle读写和传输的数据量。此外还可以应用一些较为高效的shuffle算子来代替低效的shuffle算子。例如用reduceByKey/aggregateByKey来代替groupByKey。最后shuffle在进行网络传输的过程中会通过netty使用JVM堆外内存spark任务中大规模数据的shuffle可能会导致堆外内存不足导致任务挂掉这时候需要在配置文件中调大堆外内存。GC垃圾回收总时间当JVM中execution内存不足时会启动GC垃圾回收过程。执行GC过程时候用户线程会终止等待。因此如果execution内存不够充分会触发较多的GC过程消耗较多的时间。在spark2.0之后excution内存和storage内存是统一分配的不必调整excution内存占比可以提高executor-memory来降低这种可能。或者减少executor-cores来降低这种可能(这会导致任务并行度的降低)。任务有效并行度任务实际上平均被多少个core执行。它首先取决于可用的core数量。当partition分区数量少于可用的core数量时只会有partition分区数量的core执行任务因此一般设置分区数是可用core数量的2倍以上20倍以下。此外任务有效并行度严重受到数据倾斜和计算倾斜的影响。有时候我们会看到99%的partition上的数据几分钟就执行完成了但是有1%的partition上的数据却要执行几个小时。这时候一般是发生了数据倾斜或者计算倾斜。这个时候我们说任务实际上有效的并行度会很低因为在后面的这几个小时的绝大部分时间只有很少的几个core在执行任务。任务并行度任务可用core的数量。它等于申请到的executor数量和每个executor的core数量的乘积。可以在spark-submit时候用num-executor和executor-cores来控制并行度。此外也可以开启spark.dynamicAllocation.enabled根据任务耗时动态增减executor数量。虽然提高executor-cores也能够提高并行度但是当计算需要占用较大的存储时不宜设置较高的executor-cores数量否则可能会导致executor内存不足发生内存溢出OOM。partition分区数量分区数量越大单个分区的数据量越小任务在不同的core上的数量分配会越均匀有助于提升任务有效并行度。但partition数量过大会导致更多的数据加载时间一般设置分区数是可用core数量的2倍以上20倍以下。可以在spark-submit中用spark.default.parallelism来控制RDD的默认分区数量可以用spark.sql.shuffle.partitions来控制SparkSQL中给shuffle过程的分区数量。数据倾斜度数据倾斜指的是数据量在不同的partition上分配不均匀。一般来说shuffle算子容易产生数据倾斜现象某个key上聚合的数据量可能会百万千万之多而大部分key聚合的数据量却只有几十几百个。一个partition上过大的数据量不仅需要耗费大量的计算时间而且容易出现OOM。对于数据倾斜一种简单的缓解方案是增大partition分区数量但不能从根本上解决问题。一种较好的解决方案是利用随机数构造数量为原始key数量1000倍的中间key。大概步骤如下利用1到1000的随机数和当前key组合成中间key中间key的数据倾斜程度只有原来的1/1000, 先对中间key执行一次shuffle操作得到一个数据量少得多的中间结果然后再对我们关心的原始key进行shuffle得到一个最终结果。计算倾斜度计算倾斜指的是不同partition上的数据量相差不大但是计算耗时相差巨大。考虑这样一个例子我们的RDD的每一行是一个列表我们要计算每一行中这个列表中的数两两乘积之和这个计算的复杂度是和列表长度的平方成正比的因此如果有一个列表的长度是其它列表平均长度的10倍那么计算这一行的时间将会是其它列表的100倍从而产生计算倾斜。计算倾斜和数据倾斜的表现非常相似我们会看到99%的partition上的数据几分钟就执行完成了但是有1%的partition上的数据却要执行几个小时。计算倾斜和shuffle无关在map端就可以发生。计算倾斜出现后一般可以通过舍去极端数据或者改变计算方法优化性能。堆内内存on-heap memory, 即Java虚拟机直接管理的存储由JVM负责垃圾回收GC。由多个core共享core越多每个core实际能使用的内存越少。core设置得过大容易导致OOM并使得GC时间增加。堆外内存off-heap memory, 不受JVM管理的内存, 可以精确控制申请和释放, 没有GC问题。一般shuffle过程在进行网络传输的过程中会通过netty使用到堆外内存。二Spark任务UI监控 Spark任务启动后可以在浏览器中输入 http://localhost:4040/ 进入到spark web UI 监控界面。该界面中可以从多个维度以直观的方式非常细粒度地查看Spark任务的执行情况包括任务进度耗时分析存储分析shuffle数据量大小等。最常查看的页面是 Stages页面和Excutors页面。Jobs每一个Action操作对应一个Job以Job粒度显示Application进度。有时间轴Timeline。StagesJob在遇到shuffle切开Stage显示每个Stage进度以及shuffle数据量。可以点击某个Stage进入详情页查看其下面每个Task的执行情况以及各个partition执行的费时统计。Storage:监控cache或者persist导致的数据存储大小。Environment:显示spark和scala版本依赖的各种jar包及其版本。Excutors : 监控各个Excutors的存储和shuffle情况。SQL: 显示各种SQL命令在那些Jobs中被执行。三Spark调优案例 下面介绍几个调优的典型案例1资源配置优化2利用缓存减少重复计算3数据倾斜调优4broadcastmap代替join5reduceByKey/aggregateByKey代替groupByKey1资源配置优化下面是一个资源配置的例子优化前#提交python写的任务spark-submit --master yarn \--deploy-mode cluster \--executor-memory 12G \--driver-memory 12G \--num-executors 100 \--executor-cores 8 \--conf spark.yarn.maxAppAttempts2 \--conf spark.task.maxFailures10 \--conf spark.stage.maxConsecutiveAttempts10 \--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON ./anaconda3.zip/anaconda3/bin/python #cluster模式时候设置--archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境--files data.csv,profile.txt--py-files pkg.py,tqdm.pypyspark_demo.py 优化后#提交python写的任务spark-submit --master yarn \--deploy-mode cluster \--executor-memory 12G \--driver-memory 12G \--num-executors 100 \--executor-cores 2 \--conf spark.yarn.maxAppAttempts2 \--conf spark.default.parallelism1600 \--conf spark.sql.shuffle.partitions1600 \--conf spark.memory.offHeap.enabledtrue \--conf spark.memory.offHeap.size2g\--conf spark.task.maxFailures10 \--conf spark.stage.maxConsecutiveAttempts10 \--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON ./anaconda3.zip/anaconda3/bin/python #cluster模式时候设置--archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境--files data.csv,profile.txt--py-files pkg.py,tqdm.pypyspark_demo.py 这里主要减小了 executor-cores数量一般设置为1~4过大的数量可能会造成每个core计算和存储资源不足产生OOM也会增加GC时间。此外也将默认分区数调到了1600并设置了2G的堆外内存。2, 利用缓存减少重复计算%%time# 优化前:import math rdd_x sc.parallelize(range(0,2000000,3),3)rdd_y sc.parallelize(range(2000000,4000000,2),3)rdd_z sc.parallelize(range(4000000,6000000,2),3)rdd_data rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x))s rdd_data.reduce(lambda a,b:ab0.0)n rdd_data.count()mean s/n print(mean)-1.889935655259299CPU times: user 40.2 ms, sys: 12.4 ms, total: 52.6 msWall time: 2.76 s%%time # 优化后: import math from pyspark.storagelevel import StorageLevelrdd_x sc.parallelize(range(0,2000000,3),3)rdd_y sc.parallelize(range(2000000,4000000,2),3)rdd_z sc.parallelize(range(4000000,6000000,2),3)rdd_data rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x)).persist(StorageLevel.MEMORY_AND_DISK)s rdd_data.reduce(lambda a,b:ab0.0)n rdd_data.count()mean s/n rdd_data.unpersist()print(mean)-1.889935655259299CPU times: user 40.5 ms, sys: 11.5 ms, total: 52 msWall time: 2.18 s3, 数据倾斜调优%%time # 优化前: rdd_data sc.parallelize([hello world]*1000000[good morning]*10000[I love spark]*10000)rdd_word rdd_data.flatMap(lambda x:x.split( ))rdd_one rdd_word.map(lambda x:(x,1))rdd_count rdd_one.reduceByKey(lambda a,b:ab0.0)print(rdd_count.collect()) [(good, 10000.0), (hello, 1000000.0), (spark, 10000.0), (world, 1000000.0), (love, 10000.0), (morning, 10000.0), (I, 10000.0)]CPU times: user 285 ms, sys: 27.6 ms, total: 313 msWall time: 2.74 s%%time # 优化后: import random rdd_data sc.parallelize([hello world]*1000000[good morning]*10000[I love spark]*10000)rdd_word rdd_data.flatMap(lambda x:x.split( ))rdd_one rdd_word.map(lambda x:(x,1))rdd_mid_key rdd_one.map(lambda x:(x[0]_str(random.randint(0,999)),x[1]))rdd_mid_count rdd_mid_key.reduceByKey(lambda a,b:ab0.0)rdd_count rdd_mid_count.map(lambda x:(x[0].split(_)[0],x[1])).reduceByKey(lambda a,b:ab0.0)print(rdd_count.collect()) #作者按此处仅示范原理单机上该优化方案难以获得性能优势[(good, 10000.0), (hello, 1000000.0), (spark, 10000.0), (world, 1000000.0), (love, 10000.0), (morning, 10000.0), (I, 10000.0)]CPU times: user 351 ms, sys: 51 ms, total: 402 msWall time: 7 s4, broadcastmap代替join该优化策略一般限于有一个参与join的rdd的数据量不大的情况。%%time # 优化前:rdd_age sc.parallelize([(LiLei,18),(HanMeimei,19),(Jim,17),(LiLy,20)])rdd_gender sc.parallelize([(LiLei,male),(HanMeimei,female),(Jim,male),(LiLy,female)])rdd_students rdd_age.join(rdd_gender).map(lambda x:(x[0],x[1][0],x[1][1]))print(rdd_students.collect())[(LiLy, 20, female), (LiLei, 18, male), (HanMeimei, 19, female), (Jim, 17, male)]CPU times: user 43.9 ms, sys: 11.6 ms, total: 55.6 msWall time: 307 ms%%time # 优化后:rdd_age sc.parallelize([(LiLei,18),(HanMeimei,19),(Jim,17),(LiLy,20)])rdd_gender sc.parallelize([(LiLei,male),(HanMeimei,female),(Jim,male),(LiLy,female)],2)ages rdd_age.collect()broads sc.broadcast(ages)def get_age(it): result [] ages dict(broads.value) for x in it: name x[0] age ages.get(name,0) result.append((x[0],age,x[1])) return iter(result)rdd_students rdd_gender.mapPartitions(get_age)print(rdd_students.collect())[(LiLei, 18, male), (HanMeimei, 19, female), (Jim, 17, male), (LiLy, 20, female)]CPU times: user 14.3 ms, sys: 7.43 ms, total: 21.7 msWall time: 86.3 ms5reduceByKey/aggregateByKey代替groupByKeygroupByKey算子是一个低效的算子其会产生大量的shuffle。其功能可以用reduceByKey和aggreagateByKey代替通过在每个partition内部先做一次数据的合并操作大大减少了shuffle的数据量。%%time # 优化前:rdd_students sc.parallelize([(class1,LiLei),(class2,HanMeimei),(class1,Lucy), (class1,Ann),(class1,Jim),(class2,Lily)])rdd_names rdd_students.groupByKey().map(lambda t:(t[0],list(t[1])))names rdd_names.collect()print(names)[(class1, [LiLei, Lucy, Ann, Jim]), (class2, [HanMeimei, Lily])]CPU times: user 25.3 ms, sys: 7.32 ms, total: 32.6 msWall time: 164 ms%%time # 优化后:rdd_students sc.parallelize([(class1,LiLei),(class2,HanMeimei),(class1,Lucy), (class1,Ann),(class1,Jim),(class2,Lily)])rdd_names rdd_students.aggregateByKey([],lambda arr,name:arr[name],lambda arr1,arr2:arr1arr2)names rdd_names.collect()print(names)[(class1, [LiLei, Lucy, Ann, Jim]), (class2, [HanMeimei, Lily])]CPU times: user 21.6 ms, sys: 6.63 ms, total: 28.3 msWall time: 118 ms公众号后台回复关键字pyspark获取《eat pyspark in 10 days》 github项目地址。