Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@ import scala.concurrent.duration._
import scala.util.control.NonFatal

import org.apache.spark.{broadcast, SparkException}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning}
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.joins.HashedRelation
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{SparkFatalException, ThreadUtils}

/**
Expand Down Expand Up @@ -112,17 +110,10 @@ case class BroadcastExchangeExec(
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
broadcasted
} catch {
// SPARK-24294: To bypass scala bug: https://github.com/scala/bug/issues/9554, we throw
// SparkFatalException, which is a subclass of Exception. ThreadUtils.awaitResult
// will catch this exception and re-throw the wrapped fatal throwable.
case oe: OutOfMemoryError =>
throw new SparkFatalException(
new OutOfMemoryError(s"Not enough memory to build and broadcast the table to " +
s"all worker nodes. As a workaround, you can either disable broadcast by setting " +
s"${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1 or increase the spark driver " +
s"memory by setting ${SparkLauncher.DRIVER_MEMORY} to a higher value")
.initCause(oe.getCause))
case e if !NonFatal(e) =>
// SPARK-24294: To bypass scala bug: https://github.com/scala/bug/issues/9554, we throw
// SparkFatalException, which is a subclass of Exception. ThreadUtils.awaitResult
// will catch this exception and re-throw the wrapped fatal throwable.
throw new SparkFatalException(e)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import com.esotericsoftware.kryo.io.{Input, Output}

import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED
import org.apache.spark.memory.{MemoryConsumer, StaticMemoryManager, TaskMemoryManager}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.memory.{MemoryConsumer, SparkOutOfMemoryError, StaticMemoryManager, TaskMemoryManager}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.LongType
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.map.BytesToBytesMap
Expand Down Expand Up @@ -106,11 +108,20 @@ private[execution] object HashedRelation {
1),
0)
}

if (key.length == 1 && key.head.dataType == LongType) {
LongHashedRelation(input, key, sizeEstimate, mm)
} else {
UnsafeHashedRelation(input, key, sizeEstimate, mm)
try {
if (key.length == 1 && key.head.dataType == LongType) {
LongHashedRelation(input, key, sizeEstimate, mm)
} else {
UnsafeHashedRelation(input, key, sizeEstimate, mm)
}
} catch {
case oe: SparkOutOfMemoryError =>
throw new SparkOutOfMemoryError(s"If this SparkOutOfMemoryError happens in Spark driver," +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see. So the SparkOutOfMemoryError is thrown by BytesToBytesMap, we need to catch and rethrow it to attach the error message anyway.

I also found that we may throw OOM when calling child.executeCollectIterator which calls RDD#collect, seems the previous code is corrected.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I change back ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems we don't need to change anything, maybe just add some comments to say where OOM can occur, i.e. RDD#collect and BroadcastMode#transform

s" it could because there's not enough memory to build and broadcast the table to " +
s"all worker nodes. As a workaround, you can either disable the broadcast by setting " +
s"${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1 or increase the Spark driver " +
s"memory by setting ${SparkLauncher.DRIVER_MEMORY} to a higher value")
.initCause(oe.getCause)
}
}
}
Expand Down