Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -549,14 +549,23 @@ specific to Spark on Kubernetes.
<td><code>spark.kubernetes.driver.limit.cores</code></td>
<td>(none)</td>
<td>
Specify the hard CPU [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for the driver pod.
Specify a hard cpu [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for the driver pod.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.request.cores</code></td>
<td>(none)</td>
<td>
Specify the cpu request for each executor pod. Values conform to the Kubernetes [convention](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu).
Example values include 0.1, 500m, 1.5, 5, etc., with the definition of cpu units documented in [CPU units](https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#cpu-units).
This is distinct from <code>spark.executor.cores</code>: it is only used and takes precedence over <code>spark.executor.cores</code> for specifying the executor pod cpu request if set. Task
parallelism, e.g., number of tasks an executor can run concurrently is not affected by this.
</tr>
<tr>
<td><code>spark.kubernetes.executor.limit.cores</code></td>
<td>(none)</td>
<td>
Specify the hard CPU [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for each executor pod launched for the Spark Application.
Specify a hard cpu [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for each executor pod launched for the Spark Application.
</td>
</tr>
<tr>
Expand Down Expand Up @@ -593,4 +602,4 @@ specific to Spark on Kubernetes.
<code>spark.kubernetes.executor.secrets.spark-secret=/etc/secrets</code>.
</td>
</tr>
</table>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_EXECUTOR_REQUEST_CORES =
ConfigBuilder("spark.kubernetes.executor.request.cores")
.doc("Specify the cpu request for each executor pod")
.stringConf
.createOptional

val KUBERNETES_DRIVER_POD_NAME =
ConfigBuilder("spark.kubernetes.driver.pod.name")
.doc("Name of the driver pod.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ private[spark] class ExecutorPodFactory(
MEMORY_OVERHEAD_MIN_MIB))
private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB

private val executorCores = sparkConf.getDouble("spark.executor.cores", 1)
private val executorCores = sparkConf.getInt("spark.executor.cores", 1)
private val executorCoresRequest = if (sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) {
sparkConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get
} else {
executorCores.toString
}
private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)

/**
Expand Down Expand Up @@ -114,7 +119,7 @@ private[spark] class ExecutorPodFactory(
.withAmount(s"${executorMemoryWithOverhead}Mi")
.build()
val executorCpuQuantity = new QuantityBuilder(false)
.withAmount(executorCores.toString)
.withAmount(executorCoresRequest)
.build()
val executorExtraClasspathEnv = executorExtraClasspath.map { cp =>
new EnvVarBuilder()
Expand All @@ -133,8 +138,7 @@ private[spark] class ExecutorPodFactory(
}.getOrElse(Seq.empty[EnvVar])
val executorEnv = (Seq(
(ENV_DRIVER_URL, driverUrl),
// Executor backend expects integral value for executor cores, so round it up to an int.
(ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString),
(ENV_EXECUTOR_CORES, executorCores.toString),
(ENV_EXECUTOR_MEMORY, executorMemoryString),
(ENV_APPLICATION_ID, applicationId),
// This is to set the SPARK_CONF_DIR to be /opt/spark/conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,33 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
checkOwnerReferences(executor, driverPodUid)
}

test("executor core request specification") {
var factory = new ExecutorPodFactory(baseConf, None)
var executor = factory.createExecutorPod(
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
assert(executor.getSpec.getContainers.size() === 1)
assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount
=== "1")

val conf = baseConf.clone()

conf.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "0.1")
factory = new ExecutorPodFactory(conf, None)
executor = factory.createExecutorPod(
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
assert(executor.getSpec.getContainers.size() === 1)
assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount
=== "0.1")

conf.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "100m")
factory = new ExecutorPodFactory(conf, None)
conf.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "100m")
executor = factory.createExecutorPod(
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount
=== "100m")
}

test("executor pod hostnames get truncated to 63 characters") {
val conf = baseConf.clone()
conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX,
Expand Down