Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Style fix: indent and imports
* Uses 2-space indent as Apache Spark
* Fix import order to java - 3pc - spark
  • Loading branch information
jiangzho committed Apr 4, 2024
commit 734a48b1239e3664d12cb724343fe5b8c7cda62c
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@

import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;

import org.apache.spark.kubernetes.operator.spec.BaseSpec;
import org.apache.spark.kubernetes.operator.status.BaseAttemptSummary;
import org.apache.spark.kubernetes.operator.status.BaseState;
import org.apache.spark.kubernetes.operator.status.BaseStatus;

public class BaseResource<S, AS extends BaseAttemptSummary, STATE extends BaseState<S>,
SPEC extends BaseSpec, STATUS extends BaseStatus<S, STATE, AS>>
extends CustomResource<SPEC, STATUS> implements Namespaced {
SPEC extends BaseSpec, STATUS extends BaseStatus<S, STATE, AS>>
extends CustomResource<SPEC, STATUS> implements Namespaced {
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,63 +19,59 @@
package org.apache.spark.kubernetes.operator;

public class Constants {
public static final String API_GROUP = "org.apache.spark";
public static final String API_VERSION = "v1alpha1";
public static final String LABEL_SPARK_APPLICATION_NAME = "spark.operator/spark-app-name";
public static final String LABEL_SPARK_OPERATOR_NAME = "spark.operator/name";
public static final String LABEL_RESOURCE_NAME = "app.kubernetes.io/name";
public static final String LABEL_SPARK_ROLE_NAME = "spark-role";
public static final String LABEL_SPARK_ROLE_DRIVER_VALUE = "driver";
public static final String LABEL_SPARK_ROLE_EXECUTOR_VALUE = "executor";
public static final String SPARK_CONF_SENTINEL_DUMMY_FIELD = "sentinel.dummy.number";
public static final String API_GROUP = "org.apache.spark";
public static final String API_VERSION = "v1alpha1";
public static final String LABEL_SPARK_APPLICATION_NAME = "spark.operator/spark-app-name";
public static final String LABEL_SPARK_OPERATOR_NAME = "spark.operator/name";
public static final String LABEL_RESOURCE_NAME = "app.kubernetes.io/name";
public static final String LABEL_SPARK_ROLE_NAME = "spark-role";
public static final String LABEL_SPARK_ROLE_DRIVER_VALUE = "driver";
public static final String LABEL_SPARK_ROLE_EXECUTOR_VALUE = "executor";
public static final String SPARK_CONF_SENTINEL_DUMMY_FIELD = "sentinel.dummy.number";

public static final String SENTINEL_LABEL = "spark.operator/sentinel";
public static final String SENTINEL_LABEL = "spark.operator/sentinel";

// Default state messages
public static final String DriverRequestedMessage =
"Requested driver from resource scheduler. ";
public static final String DriverCompletedMessage =
"Spark application completed successfully. ";
public static final String DriverTerminatedBeforeInitializationMessage =
"Driver container is terminated without SparkContext / SparkSession initialization. ";
public static final String DriverFailedInitContainersMessage =
"Driver has failed init container(s). Refer last observed status for details. ";
public static final String DriverFailedMessage =
"Driver has one or more failed critical container(s), refer last observed status for " +
"details. ";
public static final String DriverSucceededMessage =
"Driver has critical container(s) exited with 0. ";
public static final String DriverRestartedMessage =
"Driver has one or more critical container(s) restarted unexpectedly, refer last " +
"observed status for details. ";
public static final String AppStopRequestReceivedMessage =
"Received request to shutdown Spark application. ";
public static final String AppCancelledMessage =
"Spark application has been shutdown as requested. ";
public static final String DriverUnexpectedRemovedMessage =
"Driver removed. This could caused by 'exit' called in driver process with non-zero " +
"code, involuntary disruptions or unintentional destroy behavior, check " +
"Kubernetes events for more details. ";
public static final String DriverLaunchTimeoutMessage =
"The driver has not responded to the initial health check request within the " +
"allotted start-up time. This can be configured by setting " +
".spec.applicationTolerations.applicationTimeoutConfig ";
public static final String DriverRunning = "Driver has started running. ";
public static final String DriverReady = "Driver has reached ready state. ";
public static final String SubmittedStateMessage =
"Spark application has been created on Kubernetes Cluster. ";
public static final String UnknownStateMessage = "Cannot process application status. ";
public static final String ExceedMaxRetryAttemptMessage =
"The maximum number of restart attempts (%d) has been exceeded. ";
public static final String ScheduleFailureMessage =
"Failed to request driver from scheduler backend. ";
public static final String RunningHealthyMessage = "Application is running healthy. ";
public static final String InitializedWithBelowThresholdExecutorsMessage =
"The application is running with less than minimal number of requested initial " +
"executors. ";
public static final String RunningWithBelowThresholdExecutorsMessage =
"The Spark application is running with less than minimal number of requested " +
"executors. ";
public static final String ExecutorLaunchTimeoutMessage =
"The Spark application failed to get enough executors in the given time threshold. ";
// Default state messages
public static final String DriverRequestedMessage = "Requested driver from resource scheduler. ";
public static final String DriverCompletedMessage = "Spark application completed successfully. ";
public static final String DriverTerminatedBeforeInitializationMessage =
"Driver container is terminated without SparkContext / SparkSession initialization. ";
public static final String DriverFailedInitContainersMessage =
"Driver has failed init container(s). Refer last observed status for details. ";
public static final String DriverFailedMessage =
"Driver has one or more failed critical container(s), refer last observed status for " +
"details. ";
public static final String DriverSucceededMessage =
"Driver has critical container(s) exited with 0. ";
public static final String DriverRestartedMessage =
"Driver has one or more critical container(s) restarted unexpectedly, refer last " +
"observed status for details. ";
public static final String AppCancelledMessage =
"Spark application has been shutdown as requested. ";
public static final String DriverUnexpectedRemovedMessage =
"Driver removed. This could caused by 'exit' called in driver process with non-zero " +
"code, involuntary disruptions or unintentional destroy behavior, check " +
"Kubernetes events for more details. ";
public static final String DriverLaunchTimeoutMessage =
"The driver has not responded to the initial health check request within the " +
"allotted start-up time. This can be configured by setting " +
".spec.applicationTolerations.applicationTimeoutConfig ";
public static final String DriverRunning = "Driver has started running. ";
public static final String DriverReady = "Driver has reached ready state. ";
public static final String SubmittedStateMessage =
"Spark application has been created on Kubernetes Cluster. ";
public static final String UnknownStateMessage = "Cannot process application status. ";
public static final String ExceedMaxRetryAttemptMessage =
"The maximum number of restart attempts (%d) has been exceeded. ";
public static final String ScheduleFailureMessage =
"Failed to request driver from scheduler backend. ";
public static final String RunningHealthyMessage = "Application is running healthy. ";
public static final String InitializedWithBelowThresholdExecutorsMessage =
"The application is running with less than minimal number of requested initial " +
"executors. ";
public static final String RunningWithBelowThresholdExecutorsMessage =
"The Spark application is running with less than minimal number of requested " +
"executors. ";
public static final String ExecutorLaunchTimeoutMessage =
"The Spark application failed to get enough executors in the given time threshold. ";
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.ShortNames;
import io.fabric8.kubernetes.model.annotation.Version;

import org.apache.spark.kubernetes.operator.spec.ApplicationSpec;
import org.apache.spark.kubernetes.operator.status.ApplicationAttemptSummary;
import org.apache.spark.kubernetes.operator.status.ApplicationState;
Expand All @@ -37,15 +38,15 @@
@ShortNames({"sparkapp"})
@JsonIgnoreProperties(ignoreUnknown = true)
public class SparkApplication extends
BaseResource<ApplicationStateSummary, ApplicationAttemptSummary, ApplicationState,
ApplicationSpec, ApplicationStatus> {
@Override
public ApplicationStatus initStatus() {
return new ApplicationStatus();
}
BaseResource<ApplicationStateSummary, ApplicationAttemptSummary, ApplicationState,
ApplicationSpec, ApplicationStatus> {
@Override
public ApplicationStatus initStatus() {
return new ApplicationStatus();
}

@Override
public ApplicationSpec initSpec() {
return new ApplicationSpec();
}
@Override
public ApplicationSpec initSpec() {
return new ApplicationSpec();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@
import io.fabric8.kubernetes.api.model.HasMetadata;

public interface ResourceDecorator {
<T extends HasMetadata> T decorate(T resource);
<T extends HasMetadata> T decorate(T resource);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.spark.kubernetes.operator.spec;

import java.util.ArrayList;
import java.util.List;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.fabric8.generator.annotation.Required;
Expand All @@ -27,9 +30,6 @@
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

import java.util.ArrayList;
import java.util.List;

@Data
@NoArgsConstructor
@AllArgsConstructor
Expand All @@ -38,20 +38,20 @@
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class ApplicationSpec extends BaseSpec {
protected String mainClass;
@Required
protected RuntimeVersions runtimeVersions;
protected String jars;
protected String pyFiles;
protected String sparkRFiles;
protected String files;
@Builder.Default
protected DeploymentMode deploymentMode = DeploymentMode.ClusterMode;
protected String proxyUser;
@Builder.Default
protected List<String> driverArgs = new ArrayList<>();
@Builder.Default
protected ApplicationTolerations applicationTolerations = new ApplicationTolerations();
protected BaseApplicationTemplateSpec driverSpec;
protected BaseApplicationTemplateSpec executorSpec;
protected String mainClass;
@Required
protected RuntimeVersions runtimeVersions;
protected String jars;
protected String pyFiles;
protected String sparkRFiles;
protected String files;
@Builder.Default
protected DeploymentMode deploymentMode = DeploymentMode.ClusterMode;
protected String proxyUser;
@Builder.Default
protected List<String> driverArgs = new ArrayList<>();
@Builder.Default
protected ApplicationTolerations applicationTolerations = new ApplicationTolerations();
protected BaseApplicationTemplateSpec driverSpec;
protected BaseApplicationTemplateSpec executorSpec;
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class ApplicationTimeoutConfig {
@Builder.Default
protected Long driverStartTimeoutMillis = 300 * 1000L;
@Builder.Default
protected Long sparkSessionStartTimeoutMillis = 300 * 1000L;
@Builder.Default
protected Long executorStartTimeoutMillis = 300 * 1000L;
@Builder.Default
protected Long forceTerminationGracePeriodMillis = 300 * 1000L;
@Builder.Default
protected Long terminationRequeuePeriodMillis = 2 * 1000L;
@Builder.Default
protected Long driverStartTimeoutMillis = 300 * 1000L;
@Builder.Default
protected Long sparkSessionStartTimeoutMillis = 300 * 1000L;
@Builder.Default
protected Long executorStartTimeoutMillis = 300 * 1000L;
@Builder.Default
protected Long forceTerminationGracePeriodMillis = 300 * 1000L;
@Builder.Default
protected Long terminationRequeuePeriodMillis = 2 * 1000L;
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,21 @@
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class ApplicationTolerations {
@Builder.Default
protected RestartConfig restartConfig = new RestartConfig();
@Builder.Default
protected ApplicationTimeoutConfig applicationTimeoutConfig = new ApplicationTimeoutConfig();
/**
* Determine the toleration behavior for executor / worker instances.
*/
@Builder.Default
protected InstanceConfig instanceConfig = new InstanceConfig();
/**
* If disabled, operator would not attempt to delete resources after app terminates.
* While this can be helpful in dev phase, it shall not be enabled for prod use cases.
* Caution: in order to avoid resource conflicts among multiple attempts, this can be disabled
* iff restart policy is set to Never.
*/
@Builder.Default
protected Boolean deleteOnTermination = true;
@Builder.Default
protected RestartConfig restartConfig = new RestartConfig();
@Builder.Default
protected ApplicationTimeoutConfig applicationTimeoutConfig = new ApplicationTimeoutConfig();
/**
* Determine the toleration behavior for executor / worker instances.
*/
@Builder.Default
protected InstanceConfig instanceConfig = new InstanceConfig();
/**
* If disabled, operator would not attempt to delete resources after app terminates.
* While this can be helpful in dev phase, it shall not be enabled for prod use cases.
* Caution: in order to avoid resource conflicts among multiple attempts, this can be disabled
* iff restart policy is set to Never.
*/
@Builder.Default
protected Boolean deleteOnTermination = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class BaseApplicationTemplateSpec {
protected PodTemplateSpec podTemplateSpec;
protected PodTemplateSpec podTemplateSpec;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@

package org.apache.spark.kubernetes.operator.spec;

import java.util.HashMap;
import java.util.Map;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.Data;
import org.apache.spark.kubernetes.operator.diff.Diffable;

import java.util.HashMap;
import java.util.Map;
import org.apache.spark.kubernetes.operator.diff.Diffable;

@Data
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class BaseSpec implements Diffable<BaseSpec> {
protected Map<String, String> sparkConf = new HashMap<>();
protected Map<String, String> sparkConf = new HashMap<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@
package org.apache.spark.kubernetes.operator.spec;

public enum DeploymentMode {
ClusterMode,
ClientMode
ClusterMode,
ClientMode
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@
* Spark would try to bring up 10 executors as defined in SparkConf. In addition, from SparkApp
* perspective,
* + If Spark app acquires less than 5 executors in given tine window
* (.spec.applicationTolerations.applicationTimeoutConfig.executorStartTimeoutMillis) after
* submitted, it would be shut down proactively in order to avoid resource deadlock.
* (.spec.applicationTolerations.applicationTimeoutConfig.executorStartTimeoutMillis) after
* submitted, it would be shut down proactively in order to avoid resource deadlock.
* + Spark app would be marked as 'RUNNING_WITH_PARTIAL_CAPACITY' if it loses executors after
* successfully start up.
* successfully start up.
* + Spark app would be marked as 'RUNNING_HEALTHY' if it has at least min executors after
* successfully start up.
* successfully start up.
*/
@Data
@NoArgsConstructor
Expand All @@ -57,10 +57,10 @@
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class InstanceConfig {
@Builder.Default
protected long initExecutors = 0L;
@Builder.Default
protected long minExecutors = 0L;
@Builder.Default
protected long maxExecutors = 0L;
@Builder.Default
protected long initExecutors = 0L;
@Builder.Default
protected long minExecutors = 0L;
@Builder.Default
protected long maxExecutors = 0L;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.spark.kubernetes.operator.spec;

public enum JDKVersion {
Java11,
Java17,
Java23
Java11,
Java17,
Java23
}
Loading