Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Rename inconsistent class prefix to SparkApp, and misc clean up.
  • Loading branch information
jiangzho committed Apr 19, 2024
commit 110e1a8ff517ab74925a552297cdaa04bbda0c09
2 changes: 1 addition & 1 deletion spark-operator-docs/developer_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,5 @@ For now, in order to manually run e2e tests:
```shell
java -cp /path/to/spark-operator-test.jar \
-Dspark.operator.test.app.yaml.files.dir=/path/to/e2e-tests/ \
org.apache.spark.kubernetes.operator.AppSubmitToSucceedTest
org.apache.spark.kubernetes.operator.SparkAppSubmitToSucceedTest
```
6 changes: 4 additions & 2 deletions spark-operator-docs/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ spec:
spark.executor.instances: "5"
spark.kubernetes.authenticate.driver.serviceAccountName: spark
spark.kubernetes.container.image: spark:3.5.1-scala2.12-java11-python3-r-ubuntu
spark.kubernetes.namespace: spark-test
status:
currentAttemptSummary:
attemptInfo:
Expand Down Expand Up @@ -180,10 +179,13 @@ Delete application Spark-pi and its secondary resources with

#### Uninstallation

To remove the installed resources from your cluster, use:
To remove the installed resources from your cluster, reset environment to the defaults and
shutdown the cluster:

```bash
helm uninstall spark-kubernetes-operator
eval $(minikube docker-env --unset)
minikube stop
```

### More examples
Expand Down
2 changes: 0 additions & 2 deletions spark-operator-docs/spark_application.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ apiVersion: org.apache.spark/v1alpha1
kind: SparkApplication
metadata:
name: spark-pi
namespace: spark-test
spec:
mainClass: "org.apache.spark.examples.SparkPi"
jars: "local:///opt/spark/examples/jars/spark-examples_2.12-3.5.1.jar"
sparkConf:
spark.executor.instances: "5"
spark.kubernetes.container.image: "spark:3.5.1-scala2.12-java17-python3-ubuntu"
spark.kubernetes.namespace: "spark-test"
spark.kubernetes.authenticate.driver.serviceAccountName: "spark"
runtimeVersions:
scalaVersion: v2_12
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@

import org.apache.spark.kubernetes.operator.status.ApplicationStateSummary;

class AppSubmitToSucceedTest {
private static final Logger logger = LoggerFactory.getLogger(AppSubmitToSucceedTest.class);
class SparkAppSubmitToSucceedTest {
private static final Logger logger = LoggerFactory.getLogger(SparkAppSubmitToSucceedTest.class);

/**
* Create Spark app(s) & wait them for complete.
Expand Down Expand Up @@ -71,7 +71,7 @@ public static void main(String[] args) throws InterruptedException {
String testAppYamlFilesDir = System.getProperty("spark.operator.test.app.yaml.files.dir",
"e2e-tests/spark-apps/");
String testAppNamespace = System.getProperty("spark.operator.test.app.namespace",
"spark-test");
"default");

Set<SparkApplication> testApps =
loadSparkAppsFromFile(client, new File(testAppYamlFilesDir));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@
import org.apache.spark.kubernetes.operator.metrics.MetricsSystemFactory;
import org.apache.spark.kubernetes.operator.metrics.source.OperatorJosdkMetrics;
import org.apache.spark.kubernetes.operator.probe.ProbeService;
import org.apache.spark.kubernetes.operator.reconciler.SparkApplicationReconciler;
import org.apache.spark.kubernetes.operator.reconciler.SparkAppReconciler;
import org.apache.spark.kubernetes.operator.reconciler.SparkReconcilerUtils;
import org.apache.spark.kubernetes.operator.utils.StatusRecorder;
import org.apache.spark.kubernetes.operator.utils.SparkAppStatusRecorder;

import static org.apache.spark.kubernetes.operator.config.SparkOperatorConf.DynamicConfigEnabled;
import static org.apache.spark.kubernetes.operator.config.SparkOperatorConf.DynamicConfigSelectorStr;
Expand All @@ -67,11 +67,12 @@ public class SparkOperator {
private Operator sparkOperator;
private Operator sparkOperatorConfMonitor;
private KubernetesClient client;
private StatusRecorder statusRecorder;
private MetricsSystem metricsSystem;
private SparkAppSubmissionWorker appSubmissionWorker;
private SparkAppStatusRecorder sparkAppStatusRecorder;
protected Set<RegisteredController<?>> registeredSparkControllers;
protected Set<String> watchedNamespaces;

private MetricsSystem metricsSystem;
private SentinelManager sentinelManager;
private ProbeService probeService;
private MetricsService metricsService;
Expand All @@ -80,7 +81,9 @@ public class SparkOperator {
public SparkOperator() {
this.metricsSystem = MetricsSystemFactory.createMetricsSystem();
this.client = KubernetesClientFactory.buildKubernetesClient(metricsSystem);
this.statusRecorder = new StatusRecorder(SparkOperatorConf.getApplicationStatusListener());
this.appSubmissionWorker = new SparkAppSubmissionWorker();
this.sparkAppStatusRecorder = new SparkAppStatusRecorder(
SparkOperatorConf.getAppStatusListener());
this.registeredSparkControllers = new HashSet<>();
this.watchedNamespaces = SparkReconcilerUtils.getWatchedNamespaces();
this.sentinelManager = new SentinelManager<SparkApplication>();
Expand All @@ -96,8 +99,8 @@ public SparkOperator() {
protected Operator createOperator() {
Operator op = new Operator(this::overrideOperatorConfigs);
registeredSparkControllers.add(
op.register(new SparkApplicationReconciler(statusRecorder, sentinelManager),
this::overrideControllerConfigs));
op.register(new SparkAppReconciler(appSubmissionWorker, sparkAppStatusRecorder,
sentinelManager), this::overrideControllerConfigs));
return op;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import org.apache.spark.kubernetes.operator.listeners.ApplicationStatusListener;
import org.apache.spark.kubernetes.operator.listeners.SparkAppStatusListener;

import static org.apache.spark.kubernetes.operator.reconciler.SparkReconcilerUtils.defaultOperatorConfigLabels;
import static org.apache.spark.kubernetes.operator.reconciler.SparkReconcilerUtils.labelsAsStr;
Expand Down Expand Up @@ -208,9 +208,9 @@ public class SparkOperatorConf {
.defaultValue(3L)
.typeParameterClass(Long.class)
.build();
public static final ConfigOption<Long> AppReconcileIntervalSeconds =
public static final ConfigOption<Long> SparkAppReconcileIntervalSeconds =
ConfigOption.<Long>builder()
.key("spark.operator.app.reconcile.interval.seconds")
.key("spark.operator.application.reconcile.interval.seconds")
.description(
"Interval (in seconds) to reconcile when application is is starting " +
"up. Note that reconcile is always expected to be triggered " +
Expand Down Expand Up @@ -315,12 +315,12 @@ public class SparkOperatorConf {
.enableDynamicOverride(true)
.typeParameterClass(Long.class)
.build();
public static final ConfigOption<String> APPLICATION_STATUS_LISTENER_CLASS_NAMES =
public static final ConfigOption<String> SPARK_APP_STATUS_LISTENER_CLASS_NAMES =
ConfigOption.<String>builder()
.key("spark.operator.application.status.listener.class.names")
.defaultValue("")
.description(
"Comma-separated names of ApplicationStatusListener class " +
"Comma-separated names of SparkAppStatusListener class " +
"implementations")
.enableDynamicOverride(false)
.typeParameterClass(String.class)
Expand Down Expand Up @@ -370,19 +370,19 @@ public class SparkOperatorConf {
.typeParameterClass(Long.class)
.build();

public static List<ApplicationStatusListener> getApplicationStatusListener() {
List<ApplicationStatusListener> listeners = new ArrayList<>();
public static List<SparkAppStatusListener> getAppStatusListener() {
List<SparkAppStatusListener> listeners = new ArrayList<>();
String listenerNamesStr =
SparkOperatorConf.APPLICATION_STATUS_LISTENER_CLASS_NAMES.getValue();
SparkOperatorConf.SPARK_APP_STATUS_LISTENER_CLASS_NAMES.getValue();
if (StringUtils.isNotBlank(listenerNamesStr)) {
try {
List<String> listenerNames =
Arrays.stream(listenerNamesStr.split(",")).map(String::trim)
.collect(Collectors.toList());
for (String name : listenerNames) {
Class listenerClass = Class.forName(name);
if (ApplicationStatusListener.class.isAssignableFrom(listenerClass)) {
listeners.add((ApplicationStatusListener)
if (SparkAppStatusListener.class.isAssignableFrom(listenerClass)) {
listeners.add((SparkAppStatusListener)
listenerClass.getConstructor().newInstance());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.kubernetes.operator.controller;

import io.fabric8.kubernetes.client.KubernetesClient;

import org.apache.spark.kubernetes.operator.BaseResource;

public abstract class BaseContext<CR extends BaseResource<?, ?, ?, ?, ?>> {
public abstract CR getResource();
public abstract KubernetesClient getClient();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.spark.kubernetes.operator.ApplicationResourceSpec;
import org.apache.spark.kubernetes.operator.SparkAppResourceSpec;
import org.apache.spark.kubernetes.operator.SparkAppSubmissionWorker;
import org.apache.spark.kubernetes.operator.SparkApplication;
import org.apache.spark.kubernetes.operator.reconciler.SparkApplicationReconcileUtils;
import org.apache.spark.kubernetes.operator.reconciler.SparkAppReconcileUtils;

import static org.apache.spark.kubernetes.operator.reconciler.SparkReconcilerUtils.driverLabels;
import static org.apache.spark.kubernetes.operator.reconciler.SparkReconcilerUtils.executorLabels;
Expand All @@ -44,11 +44,11 @@
*/
@RequiredArgsConstructor
@Slf4j
public class SparkApplicationContext {
@Getter
public class SparkAppContext extends BaseContext<SparkApplication> {
private final SparkApplication sparkApplication;
private final Context<?> josdkContext;
private ApplicationResourceSpec secondaryResourceSpec;
private final SparkAppSubmissionWorker submissionWorker;
private SparkAppResourceSpec secondaryResourceSpec;

public Optional<Pod> getDriverPod() {
return josdkContext.getSecondaryResourcesAsStream(Pod.class)
Expand All @@ -64,16 +64,22 @@ public Set<Pod> getExecutorsForApplication() {
.collect(Collectors.toSet());
}

private ApplicationResourceSpec getSecondaryResourceSpec() {
private SparkAppResourceSpec getSecondaryResourceSpec() {
synchronized (this) {
if (secondaryResourceSpec == null) {
secondaryResourceSpec = SparkApplicationReconcileUtils.buildResourceSpec(
sparkApplication, josdkContext.getClient());
secondaryResourceSpec = SparkAppReconcileUtils.buildResourceSpec(sparkApplication,
josdkContext.getClient(), submissionWorker);
}
return secondaryResourceSpec;
}
}

@Override
public SparkApplication getResource() {
return sparkApplication;
}

@Override
public KubernetesClient getClient() {
return josdkContext.getClient();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public <T extends HasMetadata> T decorate(T resource) {
if (!ownerReferenceExists) {
ObjectMeta metaData = new ObjectMetaBuilder(resource.getMetadata())
.addToOwnerReferences(buildOwnerReferenceTo(driverPod))
.addToLabels(driverPod.getMetadata().getLabels())
.build();
resource.setMetadata(metaData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@
/**
* Custom listeners, if added, would be listening to Spark App status change
*/
public abstract class ApplicationStatusListener extends BaseStatusListener<ApplicationStatus,
public abstract class SparkAppStatusListener extends BaseStatusListener<ApplicationStatus,
SparkApplication> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import lombok.Data;

import static org.apache.spark.kubernetes.operator.config.SparkOperatorConf.AppReconcileIntervalSeconds;
import static org.apache.spark.kubernetes.operator.config.SparkOperatorConf.SparkAppReconcileIntervalSeconds;

/**
* Represents the progress of a reconcile request
Expand All @@ -44,12 +44,12 @@ private ReconcileProgress(boolean completed, boolean requeue, Duration requeueAf

public static ReconcileProgress proceed() {
return new ReconcileProgress(false, true,
Duration.ofSeconds(AppReconcileIntervalSeconds.getValue()));
Duration.ofSeconds(SparkAppReconcileIntervalSeconds.getValue()));
}

public static ReconcileProgress completeAndDefaultRequeue() {
return new ReconcileProgress(true, true,
Duration.ofSeconds(AppReconcileIntervalSeconds.getValue()));
Duration.ofSeconds(SparkAppReconcileIntervalSeconds.getValue()));
}

public static ReconcileProgress completeAndRequeueAfter(Duration requeueAfterDuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import io.fabric8.kubernetes.client.KubernetesClient;
import lombok.extern.slf4j.Slf4j;

import org.apache.spark.kubernetes.operator.ApplicationClientWorker;
import org.apache.spark.kubernetes.operator.ApplicationResourceSpec;
import org.apache.spark.kubernetes.operator.SparkAppResourceSpec;
import org.apache.spark.kubernetes.operator.SparkAppSubmissionWorker;
import org.apache.spark.kubernetes.operator.SparkApplication;
import org.apache.spark.kubernetes.operator.decorators.DriverDecorator;
import org.apache.spark.kubernetes.operator.utils.ModelUtils;
Expand All @@ -44,7 +44,7 @@
import static org.apache.spark.kubernetes.operator.utils.ModelUtils.overrideExecutorTemplate;

@Slf4j
public class SparkApplicationReconcileUtils {
public class SparkAppReconcileUtils {
public static boolean enableForceDelete(SparkApplication app) {
long timeoutThreshold = app.getSpec().getApplicationTolerations()
.getApplicationTimeoutConfig().getForceTerminationGracePeriodMillis();
Expand All @@ -53,19 +53,18 @@ public static boolean enableForceDelete(SparkApplication app) {
return lastTransitionTime.plusMillis(timeoutThreshold).isBefore(Instant.now());
}

public static ApplicationResourceSpec buildResourceSpec(final SparkApplication app,
final KubernetesClient client) {
Map<String, String> confOverrides = overrideMetadataForSecondaryResources(app);
ApplicationResourceSpec resourceSpec =
ApplicationClientWorker.getResourceSpec(app, client, confOverrides);
public static SparkAppResourceSpec buildResourceSpec(final SparkApplication app,
final KubernetesClient client,
final SparkAppSubmissionWorker worker) {
Map<String, String> confOverrides = overrideDependencyConf(app);
SparkAppResourceSpec resourceSpec = worker.getResourceSpec(app, client, confOverrides);
cleanUpTempResourcesForApp(app, confOverrides);
DriverDecorator decorator = new DriverDecorator(app);
decorator.decorate(resourceSpec.getConfiguredPod());
return resourceSpec;
}

private static Map<String, String> overrideMetadataForSecondaryResources(
final SparkApplication app) {
private static Map<String, String> overrideDependencyConf(final SparkApplication app) {
Map<String, String> confOverrides = new HashMap<>();
SparkReconcilerUtils.sparkAppResourceLabels(app).forEach((k, v) -> {
confOverrides.put("spark.kubernetes.driver.label." + k, v);
Expand Down
Loading