From a5004badcea9527873e976a208a83abef2a73b66 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 9 May 2018 11:34:50 -0700 Subject: [PATCH 01/11] SPARK-25004: Add spark.executor.pyspark.memory limit. --- .../apache/spark/api/python/PythonRDD.scala | 14 +++++++++++- .../spark/api/python/PythonRunner.scala | 19 +++++++++++----- .../spark/internal/config/package.scala | 4 ++++ python/pyspark/worker.py | 22 +++++++++++++++++++ .../org/apache/spark/deploy/yarn/Client.scala | 9 +++++++- .../spark/deploy/yarn/YarnAllocator.scala | 9 +++++++- .../python/AggregateInPandasExec.scala | 13 +++++++++++ .../python/ArrowEvalPythonExec.scala | 2 ++ .../execution/python/ArrowPythonRunner.scala | 3 ++- .../python/BatchEvalPythonExec.scala | 4 +++- .../sql/execution/python/EvalPythonExec.scala | 14 +++++++++++- .../python/FlatMapGroupsInPandasExec.scala | 12 ++++++++++ .../python/PythonForeachWriter.scala | 13 ++++++++++- .../execution/python/PythonUDFRunner.scala | 5 +++-- .../execution/python/WindowInPandasExec.scala | 12 ++++++++++ 15 files changed, 141 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index c3db60a23f987..a963a10174223 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -37,6 +37,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.input.PortableDataStream import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY import org.apache.spark.rdd.RDD import org.apache.spark.security.SocketAuthHelper import org.apache.spark.util._ @@ -52,6 +53,17 @@ private[spark] class PythonRDD( val bufferSize = conf.getInt("spark.buffer.size", 65536) val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) + val memoryMb = { + val allocation = conf.get(PYSPARK_EXECUTOR_MEMORY) + if (reuseWorker) { + // the shared python worker gets the entire allocation + allocation + } else { + // each python worker gets an equal part of the allocation + allocation.map(_ / conf.getInt("spark.executor.cores", 1)) + } + } + override def getPartitions: Array[Partition] = firstParent.partitions override val partitioner: Option[Partitioner] = { @@ -61,7 +73,7 @@ private[spark] class PythonRDD( val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { - val runner = PythonRunner(func, bufferSize, reuseWorker) + val runner = PythonRunner(func, bufferSize, reuseWorker, memoryMb) runner.compute(firstParent.iterator(split, context), split.index, context) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index f8241915e4849..bcfaba29a7598 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -65,7 +65,8 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( bufferSize: Int, reuseWorker: Boolean, evalType: Int, - argOffsets: Array[Array[Int]]) + argOffsets: Array[Array[Int]], + pythonMemoryMb: Option[Long]) extends Logging { require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs") @@ -95,6 +96,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( if (reuseWorker) { envVars.put("SPARK_REUSE_WORKER", "1") } + if (pythonMemoryMb.isDefined) { + envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", pythonMemoryMb.get.toString) + } val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap) // Whether is the worker released into idle pool val released = new AtomicBoolean(false) @@ -485,8 +489,12 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private[spark] object PythonRunner { - def apply(func: PythonFunction, bufferSize: Int, reuseWorker: Boolean): PythonRunner = { - new PythonRunner(Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuseWorker) + def apply( + func: PythonFunction, + bufferSize: Int, + reuseWorker: Boolean, + pyMemoryMb: Option[Long]): PythonRunner = { + new PythonRunner(Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuseWorker, pyMemoryMb) } } @@ -496,9 +504,10 @@ private[spark] object PythonRunner { private[spark] class PythonRunner( funcs: Seq[ChainedPythonFunctions], bufferSize: Int, - reuseWorker: Boolean) + reuseWorker: Boolean, + pyMemoryMb: Option[Long]) extends BasePythonRunner[Array[Byte], Array[Byte]]( - funcs, bufferSize, reuseWorker, PythonEvalType.NON_UDF, Array(Array(0))) { + funcs, bufferSize, reuseWorker, PythonEvalType.NON_UDF, Array(Array(0)), pyMemoryMb) { protected override def newWriterThread( env: SparkEnv, diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index daf3f070d72e9..8602bc8746081 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -114,6 +114,10 @@ package object config { .checkValue(_ >= 0, "The off-heap memory size must not be negative") .createWithDefault(0) + private[spark] val PYSPARK_EXECUTOR_MEMORY = ConfigBuilder("spark.executor.pyspark.memory") + .bytesConf(ByteUnit.MiB) + .createOptional + private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal() .booleanConf.createWithDefault(false) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index d54a5b8e396ea..792b4af26ca86 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -22,6 +22,7 @@ import os import sys import time +import resource import socket import traceback @@ -263,6 +264,27 @@ def main(infile, outfile): isBarrier = read_bool(infile) boundPort = read_int(infile) secret = UTF8Deserializer().loads(infile) + + # set up memory limits + memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1")) + total_memory = resource.RLIMIT_AS + try: + (total_memory_limit, max_total_memory) = resource.getrlimit(total_memory) + msg = "Current mem: {0} of max {1}\n".format(total_memory_limit, max_total_memory) + sys.stderr.write() + + if memory_limit_mb > 0 and total_memory_limit < 0: + # convert to bytes + total_memory_limit = memory_limit_mb * 1024 * 1024 + + msg = "Setting mem to {0} of max {1}\n".format(total_memory_limit, max_total_memory) + sys.stderr.write(msg) + resource.setrlimit(total_memory, (total_memory_limit, total_memory_limit)) + + except (resource.error, OSError) as e: + # not all systems support resource limits, so warn instead of failing + sys.stderr.write("WARN: Failed to set memory limit: {0}\n".format(e)) + # initialize global state taskContext = None if isBarrier: diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 75614a41e0b62..e27c4ee63e01d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -91,6 +91,13 @@ private[spark] class Client( private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt + private val isPython = sparkConf.get(IS_PYTHON_APP) + private val pysparkWorkerMemory: Int = if (isPython) { + sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0) + } else { + 0 + } + private val distCacheMgr = new ClientDistributedCacheManager() private val principal = sparkConf.get(PRINCIPAL).orNull @@ -333,7 +340,7 @@ private[spark] class Client( val maxMem = newAppResponse.getMaximumResourceCapability().getMemory() logInfo("Verifying our application has not requested more than the maximum " + s"memory capability of the cluster ($maxMem MB per container)") - val executorMem = executorMemory + executorMemoryOverhead + val executorMem = executorMemory + executorMemoryOverhead + pysparkWorkerMemory if (executorMem > maxMem) { throw new IllegalArgumentException(s"Required executor memory ($executorMemory" + s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " + diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 40f1222fcd83f..8a7551de7c088 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -133,10 +133,17 @@ private[yarn] class YarnAllocator( // Additional memory overhead. protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt + protected val pysparkWorkerMemory: Int = if (sparkConf.get(IS_PYTHON_APP)) { + sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0) + } else { + 0 + } // Number of cores per executor. protected val executorCores = sparkConf.get(EXECUTOR_CORES) // Resource capability requested for each executors - private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores) + private[yarn] val resource = Resource.newInstance( + executorMemory + memoryOverhead + pysparkWorkerMemory, + executorCores) private val launcherPool = ThreadUtils.newDaemonCachedThreadPool( "ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala index 88c9c026928e8..7f662e0f6a4ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -81,6 +82,17 @@ case class AggregateInPandasExec( val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) + val memoryMb = { + val allocation = inputRDD.conf.get(PYSPARK_EXECUTOR_MEMORY) + if (reuseWorker) { + // the shared python worker gets the entire allocation + allocation + } else { + // each python worker gets an equal part of the allocation + allocation.map(_ / inputRDD.conf.getInt("spark.executor.cores", 1)) + } + } + val sessionLocalTimeZone = conf.sessionLocalTimeZone val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf) @@ -139,6 +151,7 @@ case class AggregateInPandasExec( pyFuncs, bufferSize, reuseWorker, + memoryMb, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, argOffsets, aggInputSchema, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala index 0bc21c0986e69..e184759564bdd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala @@ -70,6 +70,7 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi funcs: Seq[ChainedPythonFunctions], bufferSize: Int, reuseWorker: Boolean, + pyMemoryMb: Option[Long], argOffsets: Array[Array[Int]], iter: Iterator[InternalRow], schema: StructType, @@ -84,6 +85,7 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi funcs, bufferSize, reuseWorker, + pyMemoryMb, PythonEvalType.SQL_SCALAR_PANDAS_UDF, argOffsets, schema, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala index 85b187159a3e6..1c5049dcc895a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala @@ -41,13 +41,14 @@ class ArrowPythonRunner( funcs: Seq[ChainedPythonFunctions], bufferSize: Int, reuseWorker: Boolean, + pyMemoryMb: Option[Long], evalType: Int, argOffsets: Array[Array[Int]], schema: StructType, timeZoneId: String, conf: Map[String, String]) extends BasePythonRunner[Iterator[InternalRow], ColumnarBatch]( - funcs, bufferSize, reuseWorker, evalType, argOffsets) { + funcs, bufferSize, reuseWorker, evalType, argOffsets, pyMemoryMb) { protected override def newWriterThread( env: SparkEnv, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala index f4d83e8dc7c2b..44def13b3c881 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala @@ -23,6 +23,7 @@ import net.razorvine.pickle.{Pickler, Unpickler} import org.apache.spark.TaskContext import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan @@ -38,6 +39,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi funcs: Seq[ChainedPythonFunctions], bufferSize: Int, reuseWorker: Boolean, + pyMemoryMb: Option[Long], argOffsets: Array[Array[Int]], iter: Iterator[InternalRow], schema: StructType, @@ -69,7 +71,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi // Output iterator for results from Python. val outputIterator = new PythonUDFRunner( - funcs, bufferSize, reuseWorker, PythonEvalType.SQL_BATCHED_UDF, argOffsets) + funcs, bufferSize, reuseWorker, PythonEvalType.SQL_BATCHED_UDF, argOffsets, pyMemoryMb) .compute(inputIterator, context.partitionId(), context) val unpickle = new Unpickler diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index 04c7dfdd4e204..babe13b78180c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.ChainedPythonFunctions +import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -80,6 +81,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil funcs: Seq[ChainedPythonFunctions], bufferSize: Int, reuseWorker: Boolean, + pyMemoryMb: Option[Long], argOffsets: Array[Array[Int]], iter: Iterator[InternalRow], schema: StructType, @@ -89,6 +91,16 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil val inputRDD = child.execute().map(_.copy()) val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) + val memoryMb = { + val allocation = inputRDD.conf.get(PYSPARK_EXECUTOR_MEMORY) + if (reuseWorker) { + // the shared python worker gets the entire allocation + allocation + } else { + // each python worker gets an equal part of the allocation + allocation.map(_ / inputRDD.conf.getInt("spark.executor.cores", 1)) + } + } inputRDD.mapPartitions { iter => val context = TaskContext.get() @@ -129,7 +141,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } val outputRowIterator = evaluate( - pyFuncs, bufferSize, reuseWorker, argOffsets, projectedRowIter, schema, context) + pyFuncs, bufferSize, reuseWorker, memoryMb, argOffsets, projectedRowIter, schema, context) val joined = new JoinedRow val resultProj = UnsafeProjection.create(output, output) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala index f5a563baf52df..08e0412126e34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala @@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.TaskContext import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -76,6 +77,16 @@ case class FlatMapGroupsInPandasExec( val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) + val memoryMb = { + val allocation = inputRDD.conf.get(PYSPARK_EXECUTOR_MEMORY) + if (reuseWorker) { + // the shared python worker gets the entire allocation + allocation + } else { + // each python worker gets an equal part of the allocation + allocation.map(_ / inputRDD.conf.getInt("spark.executor.cores", 1)) + } + } val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction))) val sessionLocalTimeZone = conf.sessionLocalTimeZone val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf) @@ -143,6 +154,7 @@ case class FlatMapGroupsInPandasExec( chainedFunc, bufferSize, reuseWorker, + memoryMb, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, argOffsets, dedupSchema, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala index f08f816cbcca9..5cc2f1b183e30 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala @@ -24,6 +24,7 @@ import java.util.concurrent.locks.ReentrantLock import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.sql.ForeachWriter import org.apache.spark.sql.catalyst.expressions.UnsafeRow @@ -48,7 +49,17 @@ class PythonForeachWriter(func: PythonFunction, schema: StructType) val conf = SparkEnv.get.conf val bufferSize = conf.getInt("spark.buffer.size", 65536) val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) - PythonRunner(func, bufferSize, reuseWorker) + val memoryMb = { + val allocation = conf.get(PYSPARK_EXECUTOR_MEMORY) + if (reuseWorker) { + // the shared python worker gets the entire allocation + allocation + } else { + // each python worker gets an equal part of the allocation + allocation.map(_ / conf.getInt("spark.executor.cores", 1)) + } + } + PythonRunner(func, bufferSize, reuseWorker, memoryMb) } private lazy val outputIterator = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala index e28def1c4b423..aa9362551e946 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala @@ -32,9 +32,10 @@ class PythonUDFRunner( bufferSize: Int, reuseWorker: Boolean, evalType: Int, - argOffsets: Array[Array[Int]]) + argOffsets: Array[Array[Int]], + pyMemoryMb: Option[Long]) extends BasePythonRunner[Array[Byte], Array[Byte]]( - funcs, bufferSize, reuseWorker, evalType, argOffsets) { + funcs, bufferSize, reuseWorker, evalType, argOffsets, pyMemoryMb) { protected override def newWriterThread( env: SparkEnv, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala index 47bfbde56bb3e..a2ca2742e5a25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -97,6 +98,16 @@ case class WindowInPandasExec( val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) + val memoryMb = { + val allocation = inputRDD.conf.get(PYSPARK_EXECUTOR_MEMORY) + if (reuseWorker) { + // the shared python worker gets the entire allocation + allocation + } else { + // each python worker gets an equal part of the allocation + allocation.map(_ / inputRDD.conf.getInt("spark.executor.cores", 1)) + } + } val sessionLocalTimeZone = conf.sessionLocalTimeZone val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf) @@ -158,6 +169,7 @@ case class WindowInPandasExec( pyFuncs, bufferSize, reuseWorker, + memoryMb, PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF, argOffsets, windowInputSchema, From 306538b1cef623347f5fd3297b68e7bb28b75364 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 3 Aug 2018 12:00:04 -0700 Subject: [PATCH 02/11] SPARK-25004: Fix problems from review. --- .../scala/org/apache/spark/internal/config/package.scala | 4 ++-- python/pyspark/worker.py | 4 ++-- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 8602bc8746081..7c2f601c9986a 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -115,8 +115,8 @@ package object config { .createWithDefault(0) private[spark] val PYSPARK_EXECUTOR_MEMORY = ConfigBuilder("spark.executor.pyspark.memory") - .bytesConf(ByteUnit.MiB) - .createOptional + .bytesConf(ByteUnit.MiB) + .createOptional private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal() .booleanConf.createWithDefault(false) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 792b4af26ca86..5ecd1eb70429a 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -271,9 +271,9 @@ def main(infile, outfile): try: (total_memory_limit, max_total_memory) = resource.getrlimit(total_memory) msg = "Current mem: {0} of max {1}\n".format(total_memory_limit, max_total_memory) - sys.stderr.write() + sys.stderr.write(msg) - if memory_limit_mb > 0 and total_memory_limit < 0: + if memory_limit_mb > 0 and total_memory_limit == resource.RLIM_INFINITY: # convert to bytes total_memory_limit = memory_limit_mb * 1024 * 1024 diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index e27c4ee63e01d..14e0a1ebcf570 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -342,10 +342,10 @@ private[spark] class Client( s"memory capability of the cluster ($maxMem MB per container)") val executorMem = executorMemory + executorMemoryOverhead + pysparkWorkerMemory if (executorMem > maxMem) { - throw new IllegalArgumentException(s"Required executor memory ($executorMemory" + - s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " + - "Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " + - "'yarn.nodemanager.resource.memory-mb'.") + throw new IllegalArgumentException(s"Required executor memory ($executorMemory), overhead " + + s"($executorMemoryOverhead MB), and PySpark memory ($pysparkWorkerMemory MB) is above " + + s"the max threshold ($maxMem MB) of this cluster! Please check the values of " + + s"'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.") } val amMem = amMemory + amMemoryOverhead if (amMem > maxMem) { From 9535a6b947bba38d5e9244a8ccebb2837197894d Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 3 Aug 2018 16:45:52 -0700 Subject: [PATCH 03/11] SPARK-25004: Move configuration into PythonRunner. --- .../apache/spark/api/python/PythonRDD.scala | 17 +---------- .../spark/api/python/PythonRunner.scala | 28 ++++++++++--------- python/pyspark/worker.py | 8 +++--- .../python/AggregateInPandasExec.scala | 20 ++----------- .../python/ArrowEvalPythonExec.scala | 11 ++------ .../execution/python/ArrowPythonRunner.scala | 8 ++---- .../python/BatchEvalPythonExec.scala | 6 +--- .../sql/execution/python/EvalPythonExec.scala | 18 +----------- .../python/FlatMapGroupsInPandasExec.scala | 19 ++----------- .../python/PythonForeachWriter.scala | 16 +---------- .../execution/python/PythonUDFRunner.scala | 6 ++-- .../execution/python/WindowInPandasExec.scala | 19 ++----------- 12 files changed, 37 insertions(+), 139 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index a963a10174223..5b1cd72c24b3a 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -37,7 +37,6 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.input.PortableDataStream import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY import org.apache.spark.rdd.RDD import org.apache.spark.security.SocketAuthHelper import org.apache.spark.util._ @@ -50,20 +49,6 @@ private[spark] class PythonRDD( isFromBarrier: Boolean = false) extends RDD[Array[Byte]](parent) { - val bufferSize = conf.getInt("spark.buffer.size", 65536) - val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) - - val memoryMb = { - val allocation = conf.get(PYSPARK_EXECUTOR_MEMORY) - if (reuseWorker) { - // the shared python worker gets the entire allocation - allocation - } else { - // each python worker gets an equal part of the allocation - allocation.map(_ / conf.getInt("spark.executor.cores", 1)) - } - } - override def getPartitions: Array[Partition] = firstParent.partitions override val partitioner: Option[Partitioner] = { @@ -73,7 +58,7 @@ private[spark] class PythonRDD( val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { - val runner = PythonRunner(func, bufferSize, reuseWorker, memoryMb) + val runner = PythonRunner(func, conf) runner.compute(firstParent.iterator(split, context), split.index, context) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index bcfaba29a7598..a17ae77480599 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -28,6 +28,7 @@ import scala.collection.JavaConverters._ import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.security.SocketAuthHelper +import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY import org.apache.spark.util._ @@ -62,15 +63,20 @@ private[spark] object PythonEvalType { */ private[spark] abstract class BasePythonRunner[IN, OUT]( funcs: Seq[ChainedPythonFunctions], - bufferSize: Int, - reuseWorker: Boolean, evalType: Int, argOffsets: Array[Array[Int]], - pythonMemoryMb: Option[Long]) + conf: SparkConf) extends Logging { require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs") + private val bufferSize = conf.getInt("spark.buffer.size", 65536) + private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) + // each python worker gets an equal part of the allocation. the worker pool will grow to the + // number of concurrent tasks, which is determined by the number of cores in this executor. + private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY) + .map(_ / conf.getInt("spark.executor.cores", 1)) + // All the Python functions should have the same exec, version and envvars. protected val envVars = funcs.head.funcs.head.envVars protected val pythonExec = funcs.head.funcs.head.pythonExec @@ -96,8 +102,8 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( if (reuseWorker) { envVars.put("SPARK_REUSE_WORKER", "1") } - if (pythonMemoryMb.isDefined) { - envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", pythonMemoryMb.get.toString) + if (memoryMb.isDefined) { + envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", memoryMb.get.toString) } val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap) // Whether is the worker released into idle pool @@ -491,10 +497,8 @@ private[spark] object PythonRunner { def apply( func: PythonFunction, - bufferSize: Int, - reuseWorker: Boolean, - pyMemoryMb: Option[Long]): PythonRunner = { - new PythonRunner(Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuseWorker, pyMemoryMb) + conf: SparkConf): PythonRunner = { + new PythonRunner(Seq(ChainedPythonFunctions(Seq(func))), conf) } } @@ -503,11 +507,9 @@ private[spark] object PythonRunner { */ private[spark] class PythonRunner( funcs: Seq[ChainedPythonFunctions], - bufferSize: Int, - reuseWorker: Boolean, - pyMemoryMb: Option[Long]) + conf: SparkConf) extends BasePythonRunner[Array[Byte], Array[Byte]]( - funcs, bufferSize, reuseWorker, PythonEvalType.NON_UDF, Array(Array(0)), pyMemoryMb) { + funcs, PythonEvalType.NON_UDF, Array(Array(0)), conf) { protected override def newWriterThread( env: SparkEnv, diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 5ecd1eb70429a..51c3e68b3f8fe 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -271,19 +271,19 @@ def main(infile, outfile): try: (total_memory_limit, max_total_memory) = resource.getrlimit(total_memory) msg = "Current mem: {0} of max {1}\n".format(total_memory_limit, max_total_memory) - sys.stderr.write(msg) + print(msg, file=sys.stderr) if memory_limit_mb > 0 and total_memory_limit == resource.RLIM_INFINITY: # convert to bytes total_memory_limit = memory_limit_mb * 1024 * 1024 msg = "Setting mem to {0} of max {1}\n".format(total_memory_limit, max_total_memory) - sys.stderr.write(msg) + print(msg, file=sys.stderr) resource.setrlimit(total_memory, (total_memory_limit, total_memory_limit)) - except (resource.error, OSError) as e: + except (resource.error, OSError, ValueError) as e: # not all systems support resource limits, so warn instead of failing - sys.stderr.write("WARN: Failed to set memory limit: {0}\n".format(e)) + print("WARN: Failed to set memory limit: {0}\n".format(e), file=sys.stderr) # initialize global state taskContext = None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala index 7f662e0f6a4ca..26499f8d94a50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala @@ -23,7 +23,6 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} -import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -80,19 +79,6 @@ case class AggregateInPandasExec( override protected def doExecute(): RDD[InternalRow] = { val inputRDD = child.execute() - val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) - val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) - val memoryMb = { - val allocation = inputRDD.conf.get(PYSPARK_EXECUTOR_MEMORY) - if (reuseWorker) { - // the shared python worker gets the entire allocation - allocation - } else { - // each python worker gets an equal part of the allocation - allocation.map(_ / inputRDD.conf.getInt("spark.executor.cores", 1)) - } - } - val sessionLocalTimeZone = conf.sessionLocalTimeZone val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf) @@ -149,14 +135,12 @@ case class AggregateInPandasExec( val columnarBatchIter = new ArrowPythonRunner( pyFuncs, - bufferSize, - reuseWorker, - memoryMb, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, argOffsets, aggInputSchema, sessionLocalTimeZone, - pythonRunnerConf).compute(projectedRowIter, context.partitionId(), context) + pythonRunnerConf, + sparkContext.conf).compute(projectedRowIter, context.partitionId(), context) val joinedAttributes = groupingExpressions.map(_.toAttribute) ++ udfExpressions.map(_.resultAttribute) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala index e184759564bdd..a4d56a59cfe31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.python import scala.collection.JavaConverters._ -import org.apache.spark.TaskContext +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -68,9 +68,6 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi protected override def evaluate( funcs: Seq[ChainedPythonFunctions], - bufferSize: Int, - reuseWorker: Boolean, - pyMemoryMb: Option[Long], argOffsets: Array[Array[Int]], iter: Iterator[InternalRow], schema: StructType, @@ -83,14 +80,12 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi val columnarBatchIter = new ArrowPythonRunner( funcs, - bufferSize, - reuseWorker, - pyMemoryMb, PythonEvalType.SQL_SCALAR_PANDAS_UDF, argOffsets, schema, sessionLocalTimeZone, - pythonRunnerConf).compute(batchIter, context.partitionId(), context) + pythonRunnerConf, + SparkEnv.get.conf).compute(batchIter, context.partitionId(), context) new Iterator[InternalRow] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala index 1c5049dcc895a..f6708e5d2b9f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala @@ -39,16 +39,14 @@ import org.apache.spark.util.Utils */ class ArrowPythonRunner( funcs: Seq[ChainedPythonFunctions], - bufferSize: Int, - reuseWorker: Boolean, - pyMemoryMb: Option[Long], evalType: Int, argOffsets: Array[Array[Int]], schema: StructType, timeZoneId: String, - conf: Map[String, String]) + conf: Map[String, String], + sparkConf: SparkConf) extends BasePythonRunner[Iterator[InternalRow], ColumnarBatch]( - funcs, bufferSize, reuseWorker, evalType, argOffsets, pyMemoryMb) { + funcs, evalType, argOffsets, sparkConf) { protected override def newWriterThread( env: SparkEnv, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala index 44def13b3c881..d3eb9a65686b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala @@ -23,7 +23,6 @@ import net.razorvine.pickle.{Pickler, Unpickler} import org.apache.spark.TaskContext import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} -import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan @@ -37,9 +36,6 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi protected override def evaluate( funcs: Seq[ChainedPythonFunctions], - bufferSize: Int, - reuseWorker: Boolean, - pyMemoryMb: Option[Long], argOffsets: Array[Array[Int]], iter: Iterator[InternalRow], schema: StructType, @@ -71,7 +67,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi // Output iterator for results from Python. val outputIterator = new PythonUDFRunner( - funcs, bufferSize, reuseWorker, PythonEvalType.SQL_BATCHED_UDF, argOffsets, pyMemoryMb) + funcs, PythonEvalType.SQL_BATCHED_UDF, argOffsets, sparkContext.conf) .compute(inputIterator, context.partitionId(), context) val unpickle = new Unpickler diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index babe13b78180c..942a6db57416e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -23,7 +23,6 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.ChainedPythonFunctions -import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -79,9 +78,6 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil protected def evaluate( funcs: Seq[ChainedPythonFunctions], - bufferSize: Int, - reuseWorker: Boolean, - pyMemoryMb: Option[Long], argOffsets: Array[Array[Int]], iter: Iterator[InternalRow], schema: StructType, @@ -89,18 +85,6 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil protected override def doExecute(): RDD[InternalRow] = { val inputRDD = child.execute().map(_.copy()) - val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) - val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) - val memoryMb = { - val allocation = inputRDD.conf.get(PYSPARK_EXECUTOR_MEMORY) - if (reuseWorker) { - // the shared python worker gets the entire allocation - allocation - } else { - // each python worker gets an equal part of the allocation - allocation.map(_ / inputRDD.conf.getInt("spark.executor.cores", 1)) - } - } inputRDD.mapPartitions { iter => val context = TaskContext.get() @@ -141,7 +125,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } val outputRowIterator = evaluate( - pyFuncs, bufferSize, reuseWorker, memoryMb, argOffsets, projectedRowIter, schema, context) + pyFuncs, argOffsets, projectedRowIter, schema, context) val joined = new JoinedRow val resultProj = UnsafeProjection.create(output, output) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala index 08e0412126e34..b8c82cd407ffa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala @@ -22,7 +22,6 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.TaskContext import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} -import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -75,18 +74,6 @@ case class FlatMapGroupsInPandasExec( override protected def doExecute(): RDD[InternalRow] = { val inputRDD = child.execute() - val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) - val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) - val memoryMb = { - val allocation = inputRDD.conf.get(PYSPARK_EXECUTOR_MEMORY) - if (reuseWorker) { - // the shared python worker gets the entire allocation - allocation - } else { - // each python worker gets an equal part of the allocation - allocation.map(_ / inputRDD.conf.getInt("spark.executor.cores", 1)) - } - } val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction))) val sessionLocalTimeZone = conf.sessionLocalTimeZone val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf) @@ -152,14 +139,12 @@ case class FlatMapGroupsInPandasExec( val columnarBatchIter = new ArrowPythonRunner( chainedFunc, - bufferSize, - reuseWorker, - memoryMb, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, argOffsets, dedupSchema, sessionLocalTimeZone, - pythonRunnerConf).compute(grouped, context.partitionId(), context) + pythonRunnerConf, + sparkContext.conf).compute(grouped, context.partitionId(), context) columnarBatchIter.flatMap(_.rowIterator.asScala).map(UnsafeProjection.create(output, output)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala index 5cc2f1b183e30..8fabfa31d8618 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala @@ -24,7 +24,6 @@ import java.util.concurrent.locks.ReentrantLock import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python._ import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.sql.ForeachWriter import org.apache.spark.sql.catalyst.expressions.UnsafeRow @@ -46,20 +45,7 @@ class PythonForeachWriter(func: PythonFunction, schema: StructType) } private lazy val pythonRunner = { - val conf = SparkEnv.get.conf - val bufferSize = conf.getInt("spark.buffer.size", 65536) - val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) - val memoryMb = { - val allocation = conf.get(PYSPARK_EXECUTOR_MEMORY) - if (reuseWorker) { - // the shared python worker gets the entire allocation - allocation - } else { - // each python worker gets an equal part of the allocation - allocation.map(_ / conf.getInt("spark.executor.cores", 1)) - } - } - PythonRunner(func, bufferSize, reuseWorker, memoryMb) + PythonRunner(func, SparkEnv.get.conf) } private lazy val outputIterator = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala index aa9362551e946..76cc51f1f0ba0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala @@ -29,13 +29,11 @@ import org.apache.spark.api.python._ */ class PythonUDFRunner( funcs: Seq[ChainedPythonFunctions], - bufferSize: Int, - reuseWorker: Boolean, evalType: Int, argOffsets: Array[Array[Int]], - pyMemoryMb: Option[Long]) + conf: SparkConf) extends BasePythonRunner[Array[Byte], Array[Byte]]( - funcs, bufferSize, reuseWorker, evalType, argOffsets, pyMemoryMb) { + funcs, evalType, argOffsets, conf) { protected override def newWriterThread( env: SparkEnv, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala index a2ca2742e5a25..71ce8a7966f0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala @@ -24,7 +24,6 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} -import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -96,18 +95,6 @@ case class WindowInPandasExec( protected override def doExecute(): RDD[InternalRow] = { val inputRDD = child.execute() - val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) - val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) - val memoryMb = { - val allocation = inputRDD.conf.get(PYSPARK_EXECUTOR_MEMORY) - if (reuseWorker) { - // the shared python worker gets the entire allocation - allocation - } else { - // each python worker gets an equal part of the allocation - allocation.map(_ / inputRDD.conf.getInt("spark.executor.cores", 1)) - } - } val sessionLocalTimeZone = conf.sessionLocalTimeZone val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf) @@ -167,14 +154,12 @@ case class WindowInPandasExec( val windowFunctionResult = new ArrowPythonRunner( pyFuncs, - bufferSize, - reuseWorker, - memoryMb, PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF, argOffsets, windowInputSchema, sessionLocalTimeZone, - pythonRunnerConf).compute(pythonInput, context.partitionId(), context) + pythonRunnerConf, + sparkContext.conf).compute(pythonInput, context.partitionId(), context) val joined = new JoinedRow val resultProj = createResultProjection(expressions) From 5288f5b1c5b5845f5bd6a2d62082176e3d4b0bda Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 13 Aug 2018 11:22:26 -0700 Subject: [PATCH 04/11] SPARK-25004: Fix problems from review. --- python/pyspark/worker.py | 17 +++++++++-------- .../python/AggregateInPandasExec.scala | 2 +- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 51c3e68b3f8fe..228b3e07c647a 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -269,17 +269,18 @@ def main(infile, outfile): memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1")) total_memory = resource.RLIMIT_AS try: - (total_memory_limit, max_total_memory) = resource.getrlimit(total_memory) - msg = "Current mem: {0} of max {1}\n".format(total_memory_limit, max_total_memory) - print(msg, file=sys.stderr) + if memory_limit_mb > 0: + (soft_limit, hard_limit) = resource.getrlimit(total_memory) + msg = "Current mem limits: {0} of max {1}\n".format(soft_limit, hard_limit) + print(msg, file=sys.stderr) - if memory_limit_mb > 0 and total_memory_limit == resource.RLIM_INFINITY: # convert to bytes - total_memory_limit = memory_limit_mb * 1024 * 1024 + new_limit = memory_limit_mb * 1024 * 1024 - msg = "Setting mem to {0} of max {1}\n".format(total_memory_limit, max_total_memory) - print(msg, file=sys.stderr) - resource.setrlimit(total_memory, (total_memory_limit, total_memory_limit)) + if soft_limit == resource.RLIM_INFINITY or new_limit < soft_limit: + msg = "Setting mem limits to {0} of max {1}\n".format(new_limit, new_limit) + print(msg, file=sys.stderr) + resource.setrlimit(total_memory, (new_limit, new_limit)) except (resource.error, OSError, ValueError) as e: # not all systems support resource limits, so warn instead of failing diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala index 26499f8d94a50..21a7e2c941578 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala @@ -140,7 +140,7 @@ case class AggregateInPandasExec( aggInputSchema, sessionLocalTimeZone, pythonRunnerConf, - sparkContext.conf).compute(projectedRowIter, context.partitionId(), context) + SparkEnv.get.conf).compute(projectedRowIter, context.partitionId(), context) val joinedAttributes = groupingExpressions.map(_.toAttribute) ++ udfExpressions.map(_.resultAttribute) From fbac4a5311a556683e696bc1aef283252af2001d Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 13 Aug 2018 14:42:35 -0700 Subject: [PATCH 05/11] SPARK-25004: Optionally capture stdout/stderr in YarnClusterSuite. This is needed to debug the tests. --- .../deploy/yarn/BaseYarnClusterSuite.scala | 27 +++++++++++++------ .../spark/deploy/yarn/YarnClusterSuite.scala | 6 +++-- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index b0abcc9149d08..3a7913122dd83 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -133,7 +133,8 @@ abstract class BaseYarnClusterSuite extraClassPath: Seq[String] = Nil, extraJars: Seq[String] = Nil, extraConf: Map[String, String] = Map(), - extraEnv: Map[String, String] = Map()): SparkAppHandle.State = { + extraEnv: Map[String, String] = Map(), + outFile: Option[File] = None): SparkAppHandle.State = { val deployMode = if (clientMode) "client" else "cluster" val propsFile = createConfFile(extraClassPath = extraClassPath, extraConf = extraConf) val env = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv @@ -161,6 +162,11 @@ abstract class BaseYarnClusterSuite } extraJars.foreach(launcher.addJar) + if (outFile.isDefined) { + launcher.redirectOutput(outFile.get) + launcher.redirectError() + } + val handle = launcher.startApplication() try { eventually(timeout(2 minutes), interval(1 second)) { @@ -179,17 +185,22 @@ abstract class BaseYarnClusterSuite * the tests enforce that something is written to a file after everything is ok to indicate * that the job succeeded. */ - protected def checkResult(finalState: SparkAppHandle.State, result: File): Unit = { - checkResult(finalState, result, "success") - } - protected def checkResult( finalState: SparkAppHandle.State, result: File, - expected: String): Unit = { - finalState should be (SparkAppHandle.State.FINISHED) + expected: String = "success", + outFile: Option[File] = None): Unit = { + // the context message is passed to assert as Any instead of a function. to lazily load the + // output from the file, this passes an anonymous object that loads it in toString when building + // an error message + val output = new Object() { + override def toString: String = outFile + .map(Files.toString(_, StandardCharsets.UTF_8)) + .getOrElse("(stdout/stderr was not captured)") + } + assert(finalState === SparkAppHandle.State.FINISHED, output) val resultString = Files.toString(result, StandardCharsets.UTF_8) - resultString should be (expected) + assert(resultString === expected, output) } protected def mainClassName(klass: Class[_]): String = { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index d67f5d2768e49..58d11e96942e1 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -282,13 +282,15 @@ class YarnClusterSuite extends BaseYarnClusterSuite { val mod2Archive = TestUtils.createJarWithFiles(Map("mod2.py" -> TEST_PYMODULE), moduleDir) val pyFiles = Seq(pyModule.getAbsolutePath(), mod2Archive.getPath()).mkString(",") val result = File.createTempFile("result", null, tempDir) + val outFile = Some(File.createTempFile("stdout", null, tempDir)) val finalState = runSpark(clientMode, primaryPyFile.getAbsolutePath(), sparkArgs = Seq("--py-files" -> pyFiles), appArgs = Seq(result.getAbsolutePath()), extraEnv = extraEnvVars, - extraConf = extraConf) - checkResult(finalState, result) + extraConf = extraConf, + outFile = outFile) + checkResult(finalState, result, outFile = outFile) } private def testUseClassPathFirst(clientMode: Boolean): Unit = { From f11b3bbc3e39c250835b75fa9fa59e46282ab288 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 15 Aug 2018 08:58:05 -0700 Subject: [PATCH 06/11] SPARK-25004: Use SparkEnv instead of SparkSession. --- .../spark/sql/execution/python/BatchEvalPythonExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala index d3eb9a65686b8..6feb83c5fd552 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import net.razorvine.pickle.{Pickler, Unpickler} -import org.apache.spark.TaskContext +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -67,7 +67,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi // Output iterator for results from Python. val outputIterator = new PythonUDFRunner( - funcs, PythonEvalType.SQL_BATCHED_UDF, argOffsets, sparkContext.conf) + funcs, PythonEvalType.SQL_BATCHED_UDF, argOffsets, SparkEnv.get.conf) .compute(inputIterator, context.partitionId(), context) val unpickle = new Unpickler From ac7de4aee3870e2b7f0b5dd77349a125020a3c3a Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 20 Aug 2018 14:45:57 -0700 Subject: [PATCH 07/11] SPARK-25004: Fix remaining reference to conf on executors. --- core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 5b1cd72c24b3a..6496ed4f9d41c 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -58,7 +58,7 @@ private[spark] class PythonRDD( val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { - val runner = PythonRunner(func, conf) + val runner = PythonRunner(func, SparkEnv.get.conf) runner.compute(firstParent.iterator(split, context), split.index, context) } From a38eac32e309a753a92ab2d74047b3cfd0382f56 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 21 Aug 2018 09:44:27 -0700 Subject: [PATCH 08/11] SPARK-25004: Remove renamining references to driver-side conf. --- .../sql/execution/python/FlatMapGroupsInPandasExec.scala | 4 ++-- .../spark/sql/execution/python/WindowInPandasExec.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala index b8c82cd407ffa..91243a57474d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.python import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import org.apache.spark.TaskContext +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -144,7 +144,7 @@ case class FlatMapGroupsInPandasExec( dedupSchema, sessionLocalTimeZone, pythonRunnerConf, - sparkContext.conf).compute(grouped, context.partitionId(), context) + SparkEnv.get.conf).compute(grouped, context.partitionId(), context) columnarBatchIter.flatMap(_.rowIterator.asScala).map(UnsafeProjection.create(output, output)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala index 71ce8a7966f0f..18cc399e7a48f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala @@ -159,7 +159,7 @@ case class WindowInPandasExec( windowInputSchema, sessionLocalTimeZone, pythonRunnerConf, - sparkContext.conf).compute(pythonInput, context.partitionId(), context) + SparkEnv.get.conf).compute(pythonInput, context.partitionId(), context) val joined = new JoinedRow val resultProj = createResultProjection(expressions) From fcee94c22bebf1d55ba8ab9091b4bcb9852717b2 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 23 Aug 2018 09:48:54 -0700 Subject: [PATCH 09/11] SPARK-25004: Access SparkEnv.get.conf in PythonRunner. --- .../apache/spark/api/python/PythonRDD.scala | 2 +- .../spark/api/python/PythonRunner.scala | 20 ++++++++----------- .../python/AggregateInPandasExec.scala | 3 +-- .../python/ArrowEvalPythonExec.scala | 5 ++--- .../execution/python/ArrowPythonRunner.scala | 5 ++--- .../python/BatchEvalPythonExec.scala | 5 ++--- .../python/FlatMapGroupsInPandasExec.scala | 5 ++--- .../python/PythonForeachWriter.scala | 2 +- .../execution/python/PythonUDFRunner.scala | 5 ++--- .../execution/python/WindowInPandasExec.scala | 3 +-- 10 files changed, 22 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 6496ed4f9d41c..197f4643e6134 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -58,7 +58,7 @@ private[spark] class PythonRDD( val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { - val runner = PythonRunner(func, SparkEnv.get.conf) + val runner = PythonRunner(func) runner.compute(firstParent.iterator(split, context), split.index, context) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index a17ae77480599..834e68b054e02 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -27,8 +27,8 @@ import scala.collection.JavaConverters._ import org.apache.spark._ import org.apache.spark.internal.Logging -import org.apache.spark.security.SocketAuthHelper import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY +import org.apache.spark.security.SocketAuthHelper import org.apache.spark.util._ @@ -64,12 +64,12 @@ private[spark] object PythonEvalType { private[spark] abstract class BasePythonRunner[IN, OUT]( funcs: Seq[ChainedPythonFunctions], evalType: Int, - argOffsets: Array[Array[Int]], - conf: SparkConf) + argOffsets: Array[Array[Int]]) extends Logging { require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs") + private val conf = SparkEnv.get.conf private val bufferSize = conf.getInt("spark.buffer.size", 65536) private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) // each python worker gets an equal part of the allocation. the worker pool will grow to the @@ -89,7 +89,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private[spark] var serverSocket: Option[ServerSocket] = None // Authentication helper used when serving method calls via socket from Python side. - private lazy val authHelper = new SocketAuthHelper(SparkEnv.get.conf) + private lazy val authHelper = new SocketAuthHelper(conf) def compute( inputIterator: Iterator[IN], @@ -495,21 +495,17 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private[spark] object PythonRunner { - def apply( - func: PythonFunction, - conf: SparkConf): PythonRunner = { - new PythonRunner(Seq(ChainedPythonFunctions(Seq(func))), conf) + def apply(func: PythonFunction): PythonRunner = { + new PythonRunner(Seq(ChainedPythonFunctions(Seq(func)))) } } /** * A helper class to run Python mapPartition in Spark. */ -private[spark] class PythonRunner( - funcs: Seq[ChainedPythonFunctions], - conf: SparkConf) +private[spark] class PythonRunner(funcs: Seq[ChainedPythonFunctions]) extends BasePythonRunner[Array[Byte], Array[Byte]]( - funcs, PythonEvalType.NON_UDF, Array(Array(0)), conf) { + funcs, PythonEvalType.NON_UDF, Array(Array(0))) { protected override def newWriterThread( env: SparkEnv, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala index 21a7e2c941578..2ab7240556aaa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala @@ -139,8 +139,7 @@ case class AggregateInPandasExec( argOffsets, aggInputSchema, sessionLocalTimeZone, - pythonRunnerConf, - SparkEnv.get.conf).compute(projectedRowIter, context.partitionId(), context) + pythonRunnerConf).compute(projectedRowIter, context.partitionId(), context) val joinedAttributes = groupingExpressions.map(_.toAttribute) ++ udfExpressions.map(_.resultAttribute) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala index a4d56a59cfe31..5af5de644d10d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.python import scala.collection.JavaConverters._ -import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -84,8 +84,7 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi argOffsets, schema, sessionLocalTimeZone, - pythonRunnerConf, - SparkEnv.get.conf).compute(batchIter, context.partitionId(), context) + pythonRunnerConf).compute(batchIter, context.partitionId(), context) new Iterator[InternalRow] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala index f6708e5d2b9f3..18992d7a9f974 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala @@ -43,10 +43,9 @@ class ArrowPythonRunner( argOffsets: Array[Array[Int]], schema: StructType, timeZoneId: String, - conf: Map[String, String], - sparkConf: SparkConf) + conf: Map[String, String]) extends BasePythonRunner[Iterator[InternalRow], ColumnarBatch]( - funcs, evalType, argOffsets, sparkConf) { + funcs, evalType, argOffsets) { protected override def newWriterThread( env: SparkEnv, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala index 6feb83c5fd552..d5320aac8d1dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import net.razorvine.pickle.{Pickler, Unpickler} -import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -66,8 +66,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi }.grouped(100).map(x => pickle.dumps(x.toArray)) // Output iterator for results from Python. - val outputIterator = new PythonUDFRunner( - funcs, PythonEvalType.SQL_BATCHED_UDF, argOffsets, SparkEnv.get.conf) + val outputIterator = new PythonUDFRunner(funcs, PythonEvalType.SQL_BATCHED_UDF, argOffsets) .compute(inputIterator, context.partitionId(), context) val unpickle = new Unpickler diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala index 91243a57474d2..e9cff1a5a2007 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.python import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -143,8 +143,7 @@ case class FlatMapGroupsInPandasExec( argOffsets, dedupSchema, sessionLocalTimeZone, - pythonRunnerConf, - SparkEnv.get.conf).compute(grouped, context.partitionId(), context) + pythonRunnerConf).compute(grouped, context.partitionId(), context) columnarBatchIter.flatMap(_.rowIterator.asScala).map(UnsafeProjection.create(output, output)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala index 8fabfa31d8618..a4e9b3305052f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala @@ -45,7 +45,7 @@ class PythonForeachWriter(func: PythonFunction, schema: StructType) } private lazy val pythonRunner = { - PythonRunner(func, SparkEnv.get.conf) + PythonRunner(func) } private lazy val outputIterator = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala index 76cc51f1f0ba0..cc61faa7e7051 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala @@ -30,10 +30,9 @@ import org.apache.spark.api.python._ class PythonUDFRunner( funcs: Seq[ChainedPythonFunctions], evalType: Int, - argOffsets: Array[Array[Int]], - conf: SparkConf) + argOffsets: Array[Array[Int]]) extends BasePythonRunner[Array[Byte], Array[Byte]]( - funcs, evalType, argOffsets, conf) { + funcs, evalType, argOffsets) { protected override def newWriterThread( env: SparkEnv, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala index 18cc399e7a48f..27bed1137e5b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala @@ -158,8 +158,7 @@ case class WindowInPandasExec( argOffsets, windowInputSchema, sessionLocalTimeZone, - pythonRunnerConf, - SparkEnv.get.conf).compute(pythonInput, context.partitionId(), context) + pythonRunnerConf).compute(pythonInput, context.partitionId(), context) val joined = new JoinedRow val resultProj = createResultProjection(expressions) From bb8fecb19e2a231061f95a327a162e48df48e9cd Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 24 Aug 2018 11:25:06 -0700 Subject: [PATCH 10/11] SPARK-25004: Add docs for spark.executor.pyspark.memory setting. --- docs/configuration.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index 0270dc2cfaf45..ad0992bb517e9 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -179,6 +179,15 @@ of the most common options to set are: (e.g. 2g, 8g). + + spark.executor.pyspark.memory + Not set + + The amount of memory to be allocated to PySpark in each executor, in MiB + unless otherwise specified. If set, PySpark memory for an executor will be + limited to this amount. If not set, Spark will not limit Python's memory use. + + spark.executor.memoryOverhead executorMemory * 0.10, with minimum of 384 From 0b275cfea7d83cdf61802da30c4a7604be8900e4 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 27 Aug 2018 14:30:55 -0700 Subject: [PATCH 11/11] SPARK-25004: Update configuration docs. --- docs/configuration.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index ad0992bb517e9..9714b48d5e69b 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -185,7 +185,10 @@ of the most common options to set are: The amount of memory to be allocated to PySpark in each executor, in MiB unless otherwise specified. If set, PySpark memory for an executor will be - limited to this amount. If not set, Spark will not limit Python's memory use. + limited to this amount. If not set, Spark will not limit Python's memory use + and it is up to the application to avoid exceeding the overhead memory space + shared with other non-JVM processes. When PySpark is run in YARN, this memory + is added to executor resource requests.