问题堆栈:

022-08-30 13:40:14,500 INFO [dag-scheduler-event-loop] org.apache.spark.internal.Logging.logInfo(Logging.scala:57): ResultStage 25 (UnionRDD (Map 26 (1), Map 25 (1))) failed in 121.779 s due to Job aborted due to stage failure: Task 1 in stage 25.0 failed 4 times, most recent failure: Lost task 1.3 in stage 25.0 (TID 5480) (emr-57egdjjry556oso56eow-core-5 executor 9): java.lang.RuntimeException: Error processing row: org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError: 2022-08-30 13:40:14	Processing rows:	200000	Hashtable size:	199999	Memory usage:	10463745568	percentage:	0.777
at org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:149)
at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48)
at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27)
at org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85)
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at org.apache.spark.rdd.AsyncRDDActions.$anonfun$foreachAsync$2(AsyncRDDActions.scala:127)
at org.apache.spark.rdd.AsyncRDDActions.$anonfun$foreachAsync$2$adapted(AsyncRDDActions.scala:127)
at org.apache.spark.SparkContext.$anonfun$submitJob$1(SparkContext.scala:2363)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError: 2022-08-30 13:40:14 Processing rows: 200000 Hashtable size: 199999 Memory usage: 10463745568 percentage: 0.777
at org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler.checkMemoryStatus(MapJoinMemoryExhaustionHandler.java:99)
at org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.process(HashTableSinkOperator.java:259)
at org.apache.hadoop.hive.ql.exec.SparkHashTableSinkOperator.process(SparkHashTableSinkOperator.java:85)
at org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator.process(VectorSparkHashTableSinkOperator.java:109)
at org.apache.hadoop.hive.ql.exec.Operator.vectorForward(Operator.java:966)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:939)
at org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator.process(VectorSelectOperator.java:158)
at org.apache.hadoop.hive.ql.exec.Operator.vectorForward(Operator.java:966)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:939)
at org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator.process(VectorFilterOperator.java:136)
at org.apache.hadoop.hive.ql.exec.Operator.vectorForward(Operator.java:966)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:939)
at org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:125)
at org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator.flushDeserializerBatch(VectorMapOperator.java:630)
at org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator.setupPartitionContextVars(VectorMapOperator.java:698)
at org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator.cleanUpInputFileChangedOp(VectorMapOperator.java:607)
at org.apache.hadoop.hive.ql.exec.Operator.cleanUpInputFileChanged(Operator.java:1225)
at org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator.process(VectorMapOperator.java:828)
at org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:136)
... 18 more

因为由于执行了mapjoin而导致内存耗尽,因此任务无法继续执行,导致失败。

MapJoin通常用于一个很小的表和一个大表进行join的场景,具体小表有多小,由参数 hive.mapjoin.smalltable.filesize来决定,该参数表示小表的总大小,默认值为25000000字节,即25M。

mapjoin的优化在于,在mapreduce task开始之前,创建一个local task, 小表以hashtable的形式加载到内存,然后序列化到磁盘,把内存的hashtable压缩为tar文件。然后把文件分发到 Hadoop Distributed Cache,然后传输给每一个mapper,mapper在本地反序列化文件并加载进内存在做join。

在TPCDS的HQL语句中,确实有多个left outer join语句。据此,可以猜测是在小表加载时,内存溢出导致的问题。

可以hive.mapjoin.localtask.max.memory.usage将内存使用极限调大到0.999,这个配置默认是0.9。但是很明显这个举措既不治标也不治本,内存里面根本就放不下。

我的解决办法是:

set hive.auto.convert.join = false #关闭mapjion,自从hive0.11.0之后这个参数默认为true

调小hive.smalltable.filesize,默认是25000000(在2.0.0版本中)

set hive.ignore.mapjoin.hint=false; 关闭忽略mapjoin的hints

hive会首先将join关联的中一个表判定为小表,然后才会将其加载到内存中。


扫码手机观看或分享: