Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8d0bc50
ODP-2189 Upgrade snakeyaml version to 2.0
senthh Sep 4, 2024
02b3f43
Merge pull request #28 from acceldata-io/ODP-2187
senthh Sep 4, 2024
ed20b70
[SPARK-35579][SQL] Bump janino to 3.1.7
Jul 18, 2022
fc00bef
[SPARK-40633][BUILD] Upgrade janino to 3.1.9
LuciferYang Nov 23, 2022
8d830cf
ODP-2167 Upgrade janino version from 3.1.9 to 3.1.10
senthh Sep 4, 2024
62589f6
Merge pull request #29 from acceldata-io/ODP-2167
senthh Sep 4, 2024
d373fea
ODP-2190 Upgrade guava version to 32.1.3-jre
senthh Sep 5, 2024
e9220a5
Merge pull request #30 from acceldata-io/ODP-2190
senthh Sep 5, 2024
d5ee21c
ODP-2193 Upgrade jettison version to 1.5.4
senthh Sep 5, 2024
f783536
ODP-2194 Upgrade wildfly-openssl version to 1.1.3
senthh Sep 5, 2024
2aa244d
Merge pull request #31 from acceldata-io/ODP-2193
senthh Sep 6, 2024
4095f60
ODP-2198 Upgrade gson version to 2.11.0
senthh Sep 6, 2024
b6f0a73
ODP-2199 Upgrade kryo-shaded version to 4.0.3
senthh Sep 6, 2024
f776c04
ODP-2200 Upgrade datanucleus-core and datanucleus-rdbms versions to 5…
senthh Sep 6, 2024
13a9e8a
ODP-2203 Upgrade Snappy and common-compress to 1.1.10.4 and 1.26.0 re…
senthh Sep 6, 2024
eb5d103
ODP-2198 Excluded gson from tink library
senthh Sep 6, 2024
9695174
ODP-2205 Upgrade jdom2 to 2.0.6.1
senthh Sep 6, 2024
00b03f8
ODP-2198 Excluded gson from hive-exec
senthh Sep 6, 2024
7ca39ca
ODP-2175|SPARK-47018 Upgrade libthrift version and hive version
senthh Sep 9, 2024
b34fee6
Merge pull request #34 from acceldata-io/ODP-2175_1
senthh Sep 9, 2024
dcd700c
[SPARK-39688][K8S] `getReusablePVCs` should handle accounts with no P…
dongjoon-hyun Jul 5, 2022
eb05549
[SPARK-40458][K8S] Bump Kubernetes Client Version to 6.1.1
attilapiros Sep 29, 2022
db280be
[SPARK-36462][K8S] Add the ability to selectively disable watching or…
holdenk Aug 18, 2022
1f69e5a
ODP-2201|SPARK-48867 Upgrade okhttp to 4.12.0, okio to 3.9.0 and esdk…
senthh Sep 9, 2024
bd710aa
[SPARK-41958][CORE][3.3] Disallow arbitrary custom classpath with pro…
degant Jun 6, 2023
62f9e94
ODP-2049 Changing Spark3 version from 3.3.3.3.2.3.2-2 to 3.3.3.3.2.3.…
senthh Sep 12, 2024
b483b9a
ODP-2049 Changing libthrift version to 0.16 in deps files
senthh Sep 12, 2024
14731b1
ODP-2049 Changing derby version to 10.14.3.0
senthh Sep 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-36462][K8S] Add the ability to selectively disable watching or…
… polling

### What changes were proposed in this pull request?

Add the ability to selectively disable watching or polling

Updated version of apache#34264

### Why are the changes needed?

Watching or polling for pod status on Kubernetes can place additional load on etcd, with a large number of executors and large number of jobs this can have negative impacts and executors register themselves with the driver under normal operations anyways.

### Does this PR introduce _any_ user-facing change?

Two new config flags.

### How was this patch tested?

New unit tests + manually tested a forked version of this on an internal cluster with both watching and polling disabled.

Closes apache#36433 from holdenk/SPARK-36462-allow-spark-on-kube-to-operate-without-watchers.

Lead-authored-by: Holden Karau <holden@pigscanfly.ca>
Co-authored-by: Holden Karau <hkarau@netflix.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>

(cherry picked from commit 5bffb98)
  • Loading branch information
holdenk authored and senthh committed Sep 9, 2024
commit db280be7f3d04272f0c57a6f243d0e163321df77
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,25 @@ private[spark] object Config extends Logging {
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
.createWithDefaultString("1s")

val KUBERNETES_EXECUTOR_ENABLE_API_POLLING =
ConfigBuilder("spark.kubernetes.executor.enableApiPolling")
.doc("If Spark should poll Kubernetes for executor pod status. " +
"You should leave this enabled unless you're encountering issues with your etcd.")
.version("3.4.0")
.internal()
.booleanConf
.createWithDefault(true)

val KUBERNETES_EXECUTOR_ENABLE_API_WATCHER =
ConfigBuilder("spark.kubernetes.executor.enableApiWatcher")
.doc("If Spark should create watchers for executor pod status. " +
"You should leave this enabled unless you're encountering issues with your etcd.")
.version("3.4.0")
.internal()
.booleanConf
.createWithDefault(true)


val KUBERNETES_EXECUTOR_API_POLLING_INTERVAL =
ConfigBuilder("spark.kubernetes.executor.apiPollingInterval")
.doc("Interval between polls against the Kubernetes API server to inspect the " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,18 @@ class ExecutorPodsPollingSnapshotSource(
pollingExecutor: ScheduledExecutorService) extends Logging {

private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
private val pollingEnabled = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_POLLING)

private var pollingFuture: Future[_] = _

@Since("3.1.3")
def start(applicationId: String): Unit = {
require(pollingFuture == null, "Cannot start polling more than once.")
logDebug(s"Starting to check for executor pod state every $pollingInterval ms.")
pollingFuture = pollingExecutor.scheduleWithFixedDelay(
new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
if (pollingEnabled) {
require(pollingFuture == null, "Cannot start polling more than once.")
logDebug(s"Starting to check for executor pod state every $pollingInterval ms.")
pollingFuture = pollingExecutor.scheduleWithFixedDelay(
new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
}
}

@Since("3.1.3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClient, Watcher, WatcherException}
import io.fabric8.kubernetes.client.Watcher.Action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Since, Stable}
import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER
import org.apache.spark.deploy.k8s.Config.KUBERNETES_NAMESPACE
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
Expand All @@ -40,13 +40,12 @@ import org.apache.spark.util.Utils
@DeveloperApi
class ExecutorPodsWatchSnapshotSource(
snapshotsStore: ExecutorPodsSnapshotsStore,
kubernetesClient: KubernetesClient) extends Logging {
kubernetesClient: KubernetesClient,
conf: SparkConf) extends Logging {

private var watchConnection: Closeable = _
private val enableWatching = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER)

private val namespace = conf.get(KUBERNETES_NAMESPACE)

// If we're constructed with the old API get the SparkConf from the running SparkContext.
def this(snapshotsStore: ExecutorPodsSnapshotsStore, kubernetesClient: KubernetesClient) = {
this(snapshotsStore, kubernetesClient, SparkContext.getOrCreate().conf)
Expand All @@ -59,7 +58,6 @@ class ExecutorPodsWatchSnapshotSource(
logDebug(s"Starting to watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.")
watchConnection = kubernetesClient.pods()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.watch(new ExecutorPodsWatcher())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit

val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
snapshotsStore,
kubernetesClient)
kubernetesClient,
sc.conf)

val eventsPollingExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"kubernetes-executor-pod-polling-sync")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.{ListOptionsBuilder, PodListBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import org.jmock.lib.concurrent.DeterministicScheduler
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Mockito.{verify, when}
import org.mockito.Mockito.{never, verify, when}
import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkFunSuite}
Expand All @@ -33,9 +33,9 @@ import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._

class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAndAfter {

private val sparkConf = new SparkConf
private val defaultConf = new SparkConf()

private val pollingInterval = sparkConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
private val pollingInterval = defaultConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)

@Mock
private var kubernetesClient: KubernetesClient = _
Expand All @@ -61,12 +61,6 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
before {
MockitoAnnotations.openMocks(this).close()
pollingExecutor = new DeterministicScheduler()
pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource(
sparkConf,
kubernetesClient,
eventQueue,
pollingExecutor)
pollingSourceUnderTest.start(TEST_SPARK_APP_ID)
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
.thenReturn(appIdLabeledPods)
Expand All @@ -77,6 +71,13 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
}

test("Items returned by the API should be pushed to the event queue") {
val sparkConf = new SparkConf()
pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource(
sparkConf,
kubernetesClient,
eventQueue,
pollingExecutor)
pollingSourceUnderTest.start(TEST_SPARK_APP_ID)
val exec1 = runningExecutor(1)
val exec2 = runningExecutor(2)
when(activeExecutorPods.list())
Expand All @@ -89,13 +90,27 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
verify(eventQueue).replaceSnapshot(Seq(exec1, exec2))
}

test("SPARK-36462: If polling is disabled we don't call pods() on the client") {
val sparkConf = new SparkConf()
val source = new ExecutorPodsPollingSnapshotSource(
sparkConf.set(KUBERNETES_EXECUTOR_ENABLE_API_POLLING, false),
kubernetesClient,
eventQueue,
pollingExecutor)
source.start(TEST_SPARK_APP_ID)
pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
verify(kubernetesClient, never()).pods()
}

test("SPARK-36334: Support pod listing with resource version") {
Seq(true, false).foreach { value =>
val sparkConf = new SparkConf()
val source = new ExecutorPodsPollingSnapshotSource(
sparkConf.set(KUBERNETES_EXECUTOR_API_POLLING_WITH_RESOURCE_VERSION, value),
kubernetesClient,
eventQueue,
pollingExecutor)
source.start(TEST_SPARK_APP_ID)
pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
if (value) {
verify(activeExecutorPods).list(new ListOptionsBuilder().withResourceVersion("0").build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.Mockito.{verify, when}
import org.mockito.Mockito.{never, verify, when}
import org.scalatest.BeforeAndAfter

import org.apache.spark.SparkConf
import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
Expand Down Expand Up @@ -65,17 +67,27 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA
when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(executorRoleLabeledPods)
when(executorRoleLabeledPods.watch(watch.capture())).thenReturn(watchConnection)
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
eventQueue, kubernetesClient)
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
}

test("Watch events should be pushed to the snapshots store as snapshot updates.") {
val conf = new SparkConf()
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
eventQueue, kubernetesClient, conf)
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
val exec1 = runningExecutor(1)
val exec2 = runningExecutor(2)
watch.getValue.eventReceived(Action.ADDED, exec1)
watch.getValue.eventReceived(Action.MODIFIED, exec2)
verify(eventQueue).updatePod(exec1)
verify(eventQueue).updatePod(exec2)
}

test("SPARK-36462: Verify if watchers are disabled we don't call pods() on the client") {
val conf = new SparkConf()
conf.set(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER, false)
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
eventQueue, kubernetesClient, conf)
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
verify(kubernetesClient, never()).pods()
}
}