Skip to content

Commit e747bcf

Browse files
jiangzhodongjoon-hyun
authored andcommitted
[SPARK-48017] Add Spark application submission worker for operator
### What changes were proposed in this pull request? This is a breakdown PR of #2 - adding a submission worker implementation for SparkApplication. ### Why are the changes needed? Spark Operator needs a submission worker to convert its abstraction (the SparkApplication API) into k8s resource spec. This is a light-weight implementation based on native k8s integration. As of now, it's based off Spark 4.0.0-preview1 - but it's assumed to serve all Spark LTS versions. This is feasible because as it aims to cover only the spec generation, Spark core jars are still brought-in by application images. E2Es would set up with operator later to ensure that. Per SPIP doc, in future operator version(s) we may add more implementations for submission worker based on different Spark versions to achieve 100% version agnostic, at the cost of having multiple workers stand-by. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test coverage. ### Was this patch authored or co-authored using generative AI tooling? no Closes #10 from jiangzho/worker. Authored-by: zhou-jiang <zhou_jiang@apple.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 91ecc93 commit e747bcf

File tree

12 files changed

+847
-0
lines changed

12 files changed

+847
-0
lines changed

build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ subprojects {
2525

2626
repositories {
2727
mavenCentral()
28+
// TODO(SPARK-48326) Upgrade submission worker base Spark version to 4.0.0-preview2
29+
maven {
30+
url "https://repository.apache.org/content/repositories/orgapachespark-1454/"
31+
}
2832
}
2933

3034
apply plugin: 'checkstyle'

gradle.properties

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,24 @@
1818
group=org.apache.spark.k8s.operator
1919
version=0.1.0
2020

21+
# Caution: fabric8 version should be aligned with Spark dependency
2122
fabric8Version=6.12.1
2223
commonsLang3Version=3.14.0
2324
commonsIOVersion=2.16.1
2425
lombokVersion=1.18.32
2526

27+
# Spark
28+
scalaVersion=2.13
29+
# TODO(SPARK-48326) Upgrade submission worker base Spark version to 4.0.0-preview2
30+
sparkVersion=4.0.0-preview1
31+
2632
# Logging
2733
log4jVersion=2.22.1
2834

2935
# Test
3036
junitVersion=5.10.2
3137
jacocoVersion=0.8.12
38+
mockitoVersion=5.11.0
3239

3340
# Build Analysis
3441
checkstyleVersion=10.15.0

settings.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
rootProject.name = 'apache-spark-kubernetes-operator'
22
include 'spark-operator-api'
3+
include 'spark-submission-worker'

spark-operator-api/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ dependencies {
1818

1919
testImplementation platform("org.junit:junit-bom:$junitVersion")
2020
testImplementation 'org.junit.jupiter:junit-jupiter'
21+
testRuntimeOnly "org.junit.platform:junit-platform-launcher"
2122
}
2223

2324
test {

spark-operator-api/src/main/java/org/apache/spark/k8s/operator/utils/ModelUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.fabric8.kubernetes.api.model.PodTemplateSpec;
3737
import org.apache.commons.lang3.StringUtils;
3838

39+
import org.apache.spark.k8s.operator.SparkApplication;
3940
import org.apache.spark.k8s.operator.spec.ApplicationSpec;
4041

4142
public class ModelUtils {
@@ -107,4 +108,12 @@ public static boolean overrideExecutorTemplateEnabled(ApplicationSpec applicatio
107108
&& applicationSpec.getExecutorSpec() != null
108109
&& applicationSpec.getExecutorSpec().getPodTemplateSpec() != null;
109110
}
111+
112+
public static long getAttemptId(final SparkApplication app) {
113+
long attemptId = 0L;
114+
if (app.getStatus() != null && app.getStatus().getCurrentAttemptSummary() != null) {
115+
attemptId = app.getStatus().getCurrentAttemptSummary().getAttemptInfo().getId();
116+
}
117+
return attemptId;
118+
}
110119
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
dependencies {
2+
implementation project(":spark-operator-api")
3+
4+
implementation("org.apache.spark:spark-kubernetes_$scalaVersion:$sparkVersion")
5+
6+
compileOnly("org.projectlombok:lombok:$lombokVersion")
7+
annotationProcessor("org.projectlombok:lombok:$lombokVersion")
8+
9+
testImplementation platform("org.junit:junit-bom:$junitVersion")
10+
testImplementation "org.mockito:mockito-core:$mockitoVersion"
11+
testImplementation "org.junit.jupiter:junit-jupiter:$junitVersion"
12+
testImplementation "io.fabric8:kubernetes-server-mock:$fabric8Version"
13+
testRuntimeOnly "org.junit.platform:junit-platform-launcher"
14+
}
15+
16+
test {
17+
useJUnitPlatform()
18+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.k8s.operator;
21+
22+
import scala.Option;
23+
24+
import org.apache.spark.SparkConf;
25+
import org.apache.spark.deploy.k8s.Config;
26+
import org.apache.spark.deploy.k8s.KubernetesDriverConf;
27+
import org.apache.spark.deploy.k8s.KubernetesVolumeUtils;
28+
import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils;
29+
import org.apache.spark.deploy.k8s.submit.MainAppResource;
30+
31+
public class SparkAppDriverConf extends KubernetesDriverConf {
32+
private SparkAppDriverConf(
33+
SparkConf sparkConf,
34+
String appId,
35+
MainAppResource mainAppResource,
36+
String mainClass,
37+
String[] appArgs,
38+
Option<String> proxyUser) {
39+
super(sparkConf, appId, mainAppResource, mainClass, appArgs, proxyUser, null);
40+
}
41+
42+
public static SparkAppDriverConf create(
43+
SparkConf sparkConf,
44+
String appId,
45+
MainAppResource mainAppResource,
46+
String mainClass,
47+
String[] appArgs,
48+
Option<String> proxyUser) {
49+
// pre-create check only
50+
KubernetesVolumeUtils.parseVolumesWithPrefix(
51+
sparkConf, Config.KUBERNETES_EXECUTOR_VOLUMES_PREFIX());
52+
return new SparkAppDriverConf(sparkConf, appId, mainAppResource, mainClass, appArgs, proxyUser);
53+
}
54+
55+
/** Application managed by operator has a deterministic prefix */
56+
@Override
57+
public String resourceNamePrefix() {
58+
return appId();
59+
}
60+
61+
/**
62+
* Create the name to be used by driver config map. The consists of `resourceNamePrefix` and Spark
63+
* instance type (driver). Operator proposes `resourceNamePrefix` with leaves naming length margin
64+
* for sub-resources to be qualified as DNS subdomain or label. In addition, the overall config
65+
* name length is governed by `KubernetesClientUtils.configMapName` - which ensures the name
66+
* length meets requirements as DNS subdomain name.
67+
*
68+
* @return proposed name to be used by driver config map
69+
*/
70+
public String configMapNameDriver() {
71+
return KubernetesClientUtils.configMapName(String.format("%s-spark-drv", resourceNamePrefix()));
72+
}
73+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.k8s.operator;
21+
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
25+
import scala.Tuple2;
26+
import scala.collection.immutable.HashMap;
27+
import scala.collection.immutable.Map;
28+
import scala.jdk.CollectionConverters;
29+
30+
import io.fabric8.kubernetes.api.model.Container;
31+
import io.fabric8.kubernetes.api.model.ContainerBuilder;
32+
import io.fabric8.kubernetes.api.model.HasMetadata;
33+
import io.fabric8.kubernetes.api.model.Pod;
34+
import io.fabric8.kubernetes.api.model.PodBuilder;
35+
import lombok.Getter;
36+
import org.apache.commons.lang3.StringUtils;
37+
38+
import org.apache.spark.deploy.k8s.Config;
39+
import org.apache.spark.deploy.k8s.Constants;
40+
import org.apache.spark.deploy.k8s.KubernetesDriverSpec;
41+
import org.apache.spark.deploy.k8s.SparkPod;
42+
import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils;
43+
44+
/**
45+
* Resembles resources that would be directly launched by operator. Based on resolved
46+
* org.apache.spark.deploy.k8s.KubernetesDriverSpec, it:
47+
*
48+
* <ul>
49+
* <li>Add ConfigMap as a resource for driver
50+
* <li>Converts scala types to Java for easier reference from operator
51+
* </ul>
52+
*
53+
* <p>This is not thread safe and not expected to be shared among reconciler threads
54+
*/
55+
public class SparkAppResourceSpec {
56+
@Getter private final Pod configuredPod;
57+
@Getter private final List<HasMetadata> driverPreResources;
58+
@Getter private final List<HasMetadata> driverResources;
59+
private final SparkAppDriverConf kubernetesDriverConf;
60+
61+
public SparkAppResourceSpec(
62+
SparkAppDriverConf kubernetesDriverConf, KubernetesDriverSpec kubernetesDriverSpec) {
63+
this.kubernetesDriverConf = kubernetesDriverConf;
64+
String namespace = kubernetesDriverConf.sparkConf().get(Config.KUBERNETES_NAMESPACE().key());
65+
Map<String, String> confFilesMap =
66+
KubernetesClientUtils.buildSparkConfDirFilesMap(
67+
kubernetesDriverConf.configMapNameDriver(),
68+
kubernetesDriverConf.sparkConf(),
69+
kubernetesDriverSpec.systemProperties())
70+
.$plus(new Tuple2<>(Config.KUBERNETES_NAMESPACE().key(), namespace));
71+
SparkPod sparkPod = addConfigMap(kubernetesDriverSpec.pod(), confFilesMap);
72+
this.configuredPod =
73+
new PodBuilder(sparkPod.pod())
74+
.editSpec()
75+
.addToContainers(sparkPod.container())
76+
.endSpec()
77+
.build();
78+
this.driverPreResources =
79+
new ArrayList<>(
80+
CollectionConverters.SeqHasAsJava(kubernetesDriverSpec.driverPreKubernetesResources())
81+
.asJava());
82+
this.driverResources =
83+
new ArrayList<>(
84+
CollectionConverters.SeqHasAsJava(kubernetesDriverSpec.driverKubernetesResources())
85+
.asJava());
86+
this.driverResources.add(
87+
KubernetesClientUtils.buildConfigMap(
88+
kubernetesDriverConf.configMapNameDriver(), confFilesMap, new HashMap<>()));
89+
this.driverPreResources.forEach(r -> setNamespaceIfMissing(r, namespace));
90+
this.driverResources.forEach(r -> setNamespaceIfMissing(r, namespace));
91+
}
92+
93+
private void setNamespaceIfMissing(HasMetadata resource, String namespace) {
94+
if (StringUtils.isNotEmpty(resource.getMetadata().getNamespace())) {
95+
return;
96+
}
97+
resource.getMetadata().setNamespace(namespace);
98+
}
99+
100+
private SparkPod addConfigMap(SparkPod pod, Map<String, String> confFilesMap) {
101+
Container containerWithConfigMapVolume =
102+
new ContainerBuilder(pod.container())
103+
.addNewEnv()
104+
.withName(Constants.ENV_SPARK_CONF_DIR())
105+
.withValue(Constants.SPARK_CONF_DIR_INTERNAL())
106+
.endEnv()
107+
.addNewVolumeMount()
108+
.withName(Constants.SPARK_CONF_VOLUME_DRIVER())
109+
.withMountPath(Constants.SPARK_CONF_DIR_INTERNAL())
110+
.endVolumeMount()
111+
.build();
112+
Pod podWithConfigMapVolume =
113+
new PodBuilder(pod.pod())
114+
.editSpec()
115+
.addNewVolume()
116+
.withName(Constants.SPARK_CONF_VOLUME_DRIVER())
117+
.withNewConfigMap()
118+
.withItems(
119+
CollectionConverters.SeqHasAsJava(
120+
KubernetesClientUtils.buildKeyToPathObjects(confFilesMap))
121+
.asJava())
122+
.withName(kubernetesDriverConf.configMapNameDriver())
123+
.endConfigMap()
124+
.endVolume()
125+
.endSpec()
126+
.build();
127+
return new SparkPod(podWithConfigMapVolume, containerWithConfigMapVolume);
128+
}
129+
}

0 commit comments

Comments
 (0)