diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index 66ac9ddb21aaa..97a3b8a6f6258 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -156,6 +156,14 @@ object StaticSQLConf { .checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in (0,128].") .createWithDefault(16) + val DYNAMIC_PRUNING_MAX_THREAD_THRESHOLD = + buildStaticConf("spark.sql.dynamic.pruning.maxThreadThreshold") + .internal() + .doc("The maximum degree of parallelism to execute the dynamic pruning.") + .intConf + .checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in (0,128].") + .createWithDefault(16) + val SQL_EVENT_TRUNCATE_LENGTH = buildStaticConf("spark.sql.event.truncate.length") .doc("Threshold of SQL length beyond which it will be truncated before adding to " + "event. Defaults to no truncation. If set to 0, callsite will be logged instead.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala index e9f2f6a2cdfaf..34ccd50e280ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.execution.joins.{HashedRelation, HashJoin, LongHashedRelation} import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.util.ThreadUtils /** @@ -57,7 +58,9 @@ case class SubqueryBroadcastExec( private lazy val relationFuture: Future[Array[InternalRow]] = { // relationFuture is used in "doExecute". Therefore we can get the execution id correctly here. val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - Future { + SQLExecution.withThreadLocalCaptured[Array[InternalRow]]( + sqlContext.sparkSession, + SubqueryBroadcastExec.executionContext) { // This will run in another thread. Set the execution id so that we can connect these jobs // with the correct execution. SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) { @@ -83,7 +86,7 @@ case class SubqueryBroadcastExec( rows } - }(SubqueryBroadcastExec.executionContext) + } } protected override def doPrepare(): Unit = { @@ -104,5 +107,6 @@ case class SubqueryBroadcastExec( object SubqueryBroadcastExec { private[execution] val executionContext = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonCachedThreadPool("dynamicpruning", 16)) + ThreadUtils.newDaemonCachedThreadPool("dynamicpruning", + SQLConf.get.getConf(StaticSQLConf.DYNAMIC_PRUNING_MAX_THREAD_THRESHOLD))) }