From a97187016c7f0711f71da6cb5b1a66ac791e843e Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Wed, 31 Jan 2018 13:29:12 -0800 Subject: [PATCH 1/4] [SPARK-23285][K8S] Allow fractional values for spark.executor.cores K8s treats CPU as a "compressible resource" and can actually assign millicpus to individual containers, e.g., 0.1 or 100m. In https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala#L94, we already parse `spark.executor.cores` as a double value. This PR simply bypasses the check for integral values for the property in Kubernetes mode. --- .../org/apache/spark/deploy/SparkSubmitArguments.scala | 6 +++++- docs/configuration.md | 5 +++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 9db7a1fe3106..487f8e1183a5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -267,7 +267,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S && Try(JavaUtils.byteStringAsBytes(executorMemory)).getOrElse(-1L) <= 0) { SparkSubmit.printErrorAndExit("Executor Memory cores must be a positive number") } - if (executorCores != null && Try(executorCores.toInt).getOrElse(-1) <= 0) { + // Kubernetes mode allows fractional values for spark.executor.cores so bypass this check in + // the Kubernetes mode. + if (!master.startsWith("k8s") + && executorCores != null + && Try(executorCores.toInt).getOrElse(-1) <= 0) { SparkSubmit.printErrorAndExit("Executor cores must be a positive number") } if (totalExecutorCores != null && Try(totalExecutorCores.toInt).getOrElse(-1) <= 0) { diff --git a/docs/configuration.md b/docs/configuration.md index e7f2419cc2fa..6aea23fe8c9a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1220,14 +1220,15 @@ Apart from these, the following properties are also available, and may be useful spark.executor.cores - 1 in YARN mode, all the available cores on the worker in + 1 in YARN and Kubernetes modes, all the available cores on the worker in standalone and Mesos coarse-grained modes. The number of cores to use on each executor. In standalone and Mesos coarse-grained modes, for more detail, see - this description. + this description. In Kubernetes mode, + a fractional value can be used, e.g., 0.1 or 100m. From d9805c3e4d4795f866e72f3c30f8ca29db90761d Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Wed, 31 Jan 2018 15:47:35 -0800 Subject: [PATCH 2/4] Addressed comments --- .../org/apache/spark/deploy/SparkSubmitArguments.scala | 6 +----- docs/configuration.md | 2 +- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 487f8e1183a5..ffc9a49f36ab 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -267,11 +267,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S && Try(JavaUtils.byteStringAsBytes(executorMemory)).getOrElse(-1L) <= 0) { SparkSubmit.printErrorAndExit("Executor Memory cores must be a positive number") } - // Kubernetes mode allows fractional values for spark.executor.cores so bypass this check in - // the Kubernetes mode. - if (!master.startsWith("k8s") - && executorCores != null - && Try(executorCores.toInt).getOrElse(-1) <= 0) { + if (executorCores != null && Try(executorCores.toDouble).getOrElse(-1d) <= 0d) { SparkSubmit.printErrorAndExit("Executor cores must be a positive number") } if (totalExecutorCores != null && Try(totalExecutorCores.toInt).getOrElse(-1) <= 0) { diff --git a/docs/configuration.md b/docs/configuration.md index 6aea23fe8c9a..8742c394cd4d 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1228,7 +1228,7 @@ Apart from these, the following properties are also available, and may be useful In standalone and Mesos coarse-grained modes, for more detail, see this description. In Kubernetes mode, - a fractional value can be used, e.g., 0.1 or 100m. + a fractional value can be used, e.g., 0.1 (100 millicpus). From 44e489eca6c22254fa57ff6f4750e3af85c43e34 Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Thu, 1 Feb 2018 14:44:01 -0800 Subject: [PATCH 3/4] Fixed check in SparkConf --- core/src/main/scala/org/apache/spark/SparkConf.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index f53b2bed74c6..4829183be519 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable.LinkedHashSet +import com.google.common.math.DoubleMath import org.apache.avro.{Schema, SchemaNormalization} import org.apache.spark.deploy.history.config._ @@ -552,12 +553,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria if (contains("spark.cores.max") && contains("spark.executor.cores")) { val totalCores = getInt("spark.cores.max", 1) - val executorCores = getInt("spark.executor.cores", 1) - val leftCores = totalCores % executorCores - if (leftCores != 0) { - logWarning(s"Total executor cores: ${totalCores} is not " + - s"divisible by cores per executor: ${executorCores}, " + - s"the left cores: ${leftCores} will not be allocated") + val executorCores = getDouble("spark.executor.cores", 1d) + if (DoubleMath.isMathematicalInteger(totalCores / executorCores)) { + logWarning(s"Total executor cores: $totalCores is not " + + s"divisible by cores per executor: $executorCores") } } From b21e8640142f2f22d169bf461238d34009cc503d Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Fri, 2 Feb 2018 08:58:27 -0800 Subject: [PATCH 4/4] Also allow spark.task.cpus to have fractional values --- .../org/apache/spark/ExecutorAllocationManager.scala | 6 +++--- .../scala/org/apache/spark/internal/config/package.scala | 3 ++- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 6 ++++-- .../cluster/CoarseGrainedSchedulerBackend.scala | 4 ++-- .../spark/scheduler/local/LocalSchedulerBackend.scala | 4 ++-- .../org/apache/spark/status/AppStatusListener.scala | 8 +++++--- core/src/main/scala/org/apache/spark/util/Utils.scala | 9 +++++++++ .../apache/spark/scheduler/TaskSchedulerImplSuite.scala | 4 ++-- .../test/scala/org/apache/spark/util/UtilsSuite.scala | 7 +++++++ docs/configuration.md | 2 +- .../LocalityPreferredContainerPlacementStrategy.scala | 2 +- 11 files changed, 38 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 6c59038f2a6c..82359263b1d5 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -26,7 +26,7 @@ import scala.util.control.{ControlThrowable, NonFatal} import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS} +import org.apache.spark.internal.config.{CPUS_PER_TASK, DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS} import org.apache.spark.metrics.source.Source import org.apache.spark.scheduler._ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} @@ -114,8 +114,8 @@ private[spark] class ExecutorAllocationManager( // TODO: The default value of 1 for spark.executor.cores works right now because dynamic // allocation is only supported for YARN and the default number of cores per executor in YARN is // 1, but it might need to be attained differently for different cluster managers - private val tasksPerExecutor = - conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1) + private val tasksPerExecutor = Utils.getTasksPerExecutor( + conf.getDouble("spark.executor.cores", 1), conf.get(CPUS_PER_TASK)) validateSettings() diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index bbfcfbaa7363..a62b42c4dbcd 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -114,7 +114,8 @@ package object config { private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal() .booleanConf.createWithDefault(false) - private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").intConf.createWithDefault(1) + private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").doubleConf + .createWithDefault(1d) private[spark] val DYN_ALLOCATION_MIN_EXECUTORS = ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.createWithDefault(0) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 0c11806b3981..f40face43d69 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -83,7 +83,7 @@ private[spark] class TaskSchedulerImpl( val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s") // CPUs to request per task - val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) + val CPUS_PER_TASK = conf.get(config.CPUS_PER_TASK) // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. @@ -344,7 +344,9 @@ private[spark] class TaskSchedulerImpl( val shuffledOffers = shuffleOffers(filteredOffers) // Build a list of tasks to assign to each worker. - val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) + val tasks = shuffledOffers.map { o => + new ArrayBuffer[TaskDescription](Utils.getTasksPerExecutor(o.cores, CPUS_PER_TASK)) + } val availableCpus = shuffledOffers.map(o => o.cores).toArray val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 4d75063fbf1c..7d67f13056f2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -123,7 +123,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp if (TaskState.isFinished(state)) { executorDataMap.get(executorId) match { case Some(executorInfo) => - executorInfo.freeCores += scheduler.CPUS_PER_TASK + executorInfo.freeCores += scheduler.CPUS_PER_TASK.toInt makeOffers(executorId) case None => // Ignoring the update since we don't know about the executor. @@ -302,7 +302,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } else { val executorData = executorDataMap(task.executorId) - executorData.freeCores -= scheduler.CPUS_PER_TASK + executorData.freeCores -= scheduler.CPUS_PER_TASK.toInt logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " + s"${executorData.executorHost}.") diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index 4c614c5c0f60..e0e9e403c17c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -66,7 +66,7 @@ private[spark] class LocalEndpoint( case StatusUpdate(taskId, state, serializedData) => scheduler.statusUpdate(taskId, state, serializedData) if (TaskState.isFinished(state)) { - freeCores += scheduler.CPUS_PER_TASK + freeCores += scheduler.CPUS_PER_TASK.toInt reviveOffers() } @@ -83,7 +83,7 @@ private[spark] class LocalEndpoint( def reviveOffers() { val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) for (task <- scheduler.resourceOffers(offers).flatten) { - freeCores -= scheduler.CPUS_PER_TASK + freeCores -= scheduler.CPUS_PER_TASK.toInt executor.launchTask(executorBackend, task) } } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 3e34bdc0c7b6..f469863ac7b6 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -27,11 +27,13 @@ import scala.collection.mutable.HashMap import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.CPUS_PER_TASK import org.apache.spark.scheduler._ import org.apache.spark.status.api.v1 import org.apache.spark.storage._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.scope._ +import org.apache.spark.util.Utils /** * A Spark listener that writes application information to a data store. The types written to the @@ -51,7 +53,7 @@ private[spark] class AppStatusListener( private var sparkVersion = SPARK_VERSION private var appInfo: v1.ApplicationInfo = null private var appSummary = new AppSummary(0, 0) - private var coresPerTask: Int = 1 + private var coresPerTask: Double = 1d // How often to update live entities. -1 means "never update" when replaying applications, // meaning only the last write will happen. For live applications, this avoids a few @@ -147,7 +149,7 @@ private[spark] class AppStatusListener( details.getOrElse("System Properties", Nil), details.getOrElse("Classpath Entries", Nil)) - coresPerTask = envInfo.sparkProperties.toMap.get("spark.task.cpus").map(_.toInt) + coresPerTask = envInfo.sparkProperties.toMap.get(CPUS_PER_TASK.key).map(_.toDouble) .getOrElse(coresPerTask) kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo)) @@ -183,7 +185,7 @@ private[spark] class AppStatusListener( exec.host = event.executorInfo.executorHost exec.isActive = true exec.totalCores = event.executorInfo.totalCores - exec.maxTasks = event.executorInfo.totalCores / coresPerTask + exec.maxTasks = Utils.getTasksPerExecutor(event.executorInfo.totalCores, coresPerTask) exec.executorLogs = event.executorInfo.logUrlMap liveUpdate(exec, System.nanoTime()) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 585330297314..4324e017fc6c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -45,6 +45,7 @@ import scala.util.matching.Regex import _root_.io.netty.channel.unix.Errors.NativeIoException import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import com.google.common.io.{ByteStreams, Files => GFiles} +import com.google.common.math.DoubleMath import com.google.common.net.InetAddresses import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration @@ -2805,6 +2806,14 @@ private[spark] object Utils extends Logging { s"k8s://$resolvedURL" } + + /** + * Get the number of tasks an executor can take given the number of CPU cores allocated to the + * executor and the number of CPU cores per task. + */ + def getTasksPerExecutor(cores: Double, taskCpus: Double): Int = { + DoubleMath.roundToInt(cores / taskCpus, RoundingMode.FLOOR) + } } private[util] object CallerContext extends Logging { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 6003899bb7be..a8b52c523698 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -150,7 +150,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("Scheduler correctly accounts for multiple CPUs per task") { val taskCpus = 2 - val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString) + val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString) // Give zero core offers. Should not generate any tasks val zeroCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 0), new WorkerOffer("executor1", "host1", 0)) @@ -180,7 +180,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("Scheduler does not crash when tasks are not serializable") { val taskCpus = 2 - val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString) + val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString) val numFreeCores = 1 val taskSet = new TaskSet( Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index eaea6b030c15..d15070263b3d 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -1167,6 +1167,13 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { Utils.checkAndGetK8sMasterUrl("k8s://foo://host:port") } } + + test("get tasks per executor") { + assert(Utils.getTasksPerExecutor(1d, 1d) == 1) + assert(Utils.getTasksPerExecutor(2d, 0.5d) == 4) + assert(Utils.getTasksPerExecutor(0.1d, 0.05d) == 2) + assert(Utils.getTasksPerExecutor(0.5d, 0.6d) == 0) + } } private class SimpleExtension diff --git a/docs/configuration.md b/docs/configuration.md index 8742c394cd4d..6f59d88d33b3 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1670,7 +1670,7 @@ Apart from these, the following properties are also available, and may be useful spark.task.cpus 1 - Number of cores to allocate for each task. + Number of cores to allocate for each task. Can have fractional values, e.g. 0.1 (100 millicpus), in Kubernetes mode. diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala index 0a7a16f468fb..da0c964da016 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala @@ -157,7 +157,7 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy( */ private def numExecutorsPending(numTasksPending: Int): Int = { val coresPerExecutor = resource.getVirtualCores - (numTasksPending * sparkConf.get(CPUS_PER_TASK) + coresPerExecutor - 1) / coresPerExecutor + (numTasksPending * sparkConf.get(CPUS_PER_TASK).toInt + coresPerExecutor - 1) / coresPerExecutor } /**