跑TPCDS的Q4的时候,会出现如下问题;

2022-08-29 20:30:48,194 INFO [task-result-getter-2] org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54): Task 30.1 in stage 14.3 (TID 4943) failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded).  
2022-08-29 20:30:48,194 WARN [task-result-getter-1] org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66): Lost task 32.1 in stage 14.3 (TID 4944, emr-48gafxrehv5az4mb8n0n-core-6, executor 25): FetchFailed(null, shuffleId=12, mapId=-1, reduceId=38, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 12
at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:893)
at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:889)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:889)
at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:691)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

)

具体到Spark分布式作业的时候,会有部分stage失败,看失败的原因是:

ExecutorLostFailure (executor 17 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 28.1 GB of 28 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.

也就是部分Spark作业执行的时候失败,然后reduce到driver的时候会出现shuffle的地方没有数据,从而引起前面的问题,我尝试增大Driver和Executor的大小到40GB的时候能够跑过,但是直观感觉不应该设置这么大的内存,因为我有大约400个并发。

仔细看了下错误的问题,看起来是用到了memoryOverhead,shuffle分为shuffle write和shuffle read两部分。

shuffle write的分区数由上一阶段的RDD分区数控制,shuffle read的分区数则是由Spark提供的一些参数控制。
shuffle write可以简单理解为类似于saveAsLocalDiskFile的操作,将计算的中间结果按某种规则临时放到各个executor所在的本地磁盘上。
shuffle read的时候数据的分区数则是由spark提供的一些参数控制。可以想到的是,如果这个参数值设置的很小,同时shuffle read的量很大,那么将会导致一个task需要处理的数据非常大。
结果导致JVM crash,从而导致取shuffle数据失败,同时executor也丢失了,看到Failed to connect to host的错误,也就是executor lost的意思。
有时候即使不会导致JVM crash也会造成长时间的gc。

解决的方向是减少shuffle数据。

主要从代码层面着手,可以将不必要的数据在shuffle前进行过滤,比如原始数据有20个字段,只要选取需要的字段进行处理即可,将会减少一定的shuffle数据。

修改分区

通过spark.sql.shuffle.partitions控制分区数,默认为200,根据shuffle的量以及计算的复杂度适当提高这个值,例如500。

增加失败的重试次数和重试的时间间隔
通过spark.shuffle.io.maxRetries控制重试次数,默认是3,可适当增加,例如10。
通过spark.shuffle.io.retryWait控制重试的时间间隔,默认是5s,可适当增加,例如10s。

提高executor的内存
在spark-submit提交任务时,适当提高executor的memory值,例如15G或者20G。

同时还有一个方向就是堆外内存,对于spark来说Spark 对 JVM 的空间(Heap+Off-heap)进行了更为详细的分配,以充分利用内存。
同时,Spark 引入了Off-heap(TungSten)内存模式,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用(可以理解为是独立于JVM托管的Heap之外利用c-style的malloc从os分配到的memory。由于不再由JVM托管,通过高效的内存管理,可以避免JVM object overhead和Garbage collection的开销)。

在spark中默认的堆外内存是executorMemory * 0.10, with minimum of 384,最小的默认是384。

可以通过如下方式去修改:

set spark.yarn.driver.memoryOverhead=20g;
set spark.yarn.executor.memoryOverhead=20g;


扫码手机观看或分享: