Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fully qualify the property names.
  • Loading branch information
jiangzho committed Apr 8, 2024
commit bae3e140f20b6e227b3eab6691bbb7675314bc88
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,26 @@
*/
@Slf4j
public class SparkOperatorConf {
public static final String PREFIX = "spark.operator.";
public static final String METRIC_PREFIX = "spark.metrics.conf.operator.";
public static final String SINK = "sink.";
public static final String CLASS = "class";

public static final ConfigOption<String> OperatorAppName = ConfigOption.<String>builder()
.key(PREFIX + "name")
.key("spark.operator.name")
.typeParameterClass(String.class)
.description("Name of the operator.")
.defaultValue("spark-kubernetes-operator")
.enableDynamicOverride(false)
.build();
public static final ConfigOption<String> OperatorNamespace = ConfigOption.<String>builder()
.key(PREFIX + "namespace")
.key("spark.operator.namespace")
.typeParameterClass(String.class)
.description("Namespace that operator is deployed within.")
.defaultValue("spark-system")
.enableDynamicOverride(false)
.build();
public static final ConfigOption<Boolean> DynamicConfigEnabled = ConfigOption.<Boolean>builder()
.key(PREFIX + "dynamic.config.enabled")
.key("spark.operator.dynamic.config.enabled")
.typeParameterClass(Boolean.class)
.description(
"When enabled, operator would use config map as source of truth for config " +
Expand All @@ -69,15 +68,15 @@ public class SparkOperatorConf {
.build();
public static final ConfigOption<String> DynamicConfigSelectorStr =
ConfigOption.<String>builder()
.key(PREFIX + "dynamic.config.selector.str")
.key("spark.operator.dynamic.config.selector.str")
.typeParameterClass(String.class)
.description("The selector str applied to dynamic config map.")
.defaultValue(labelsAsStr(defaultOperatorConfigLabels()))
.enableDynamicOverride(false)
.build();
public static final ConfigOption<Boolean> TerminateOnInformerFailure =
ConfigOption.<Boolean>builder()
.key(PREFIX + "terminate.on.informer.failure")
.key("spark.operator.terminate.on.informer.failure")
.typeParameterClass(Boolean.class)
.description(
"Enable to indicate informer errors should stop operator startup. If " +
Expand All @@ -89,7 +88,7 @@ public class SparkOperatorConf {
.build();
public static final ConfigOption<Integer> TerminationTimeoutSeconds =
ConfigOption.<Integer>builder()
.key(PREFIX + "termination.timeout.seconds")
.key("spark.operator.termination.timeout.seconds")
.description(
"Grace period for operator shutdown before reconciliation threads " +
"are killed.")
Expand All @@ -99,7 +98,7 @@ public class SparkOperatorConf {
.build();
public static final ConfigOption<Integer> ReconcilerParallelism =
ConfigOption.<Integer>builder()
.key(PREFIX + "reconciler.parallelism")
.key("spark.operator.reconciler.parallelism")
.description(
"Thread pool size for Spark Operator reconcilers. Use -1 for " +
"unbounded pool.")
Expand All @@ -109,15 +108,15 @@ public class SparkOperatorConf {
.build();
public static final ConfigOption<Integer> RateLimiterRefreshPeriodSeconds =
ConfigOption.<Integer>builder()
.key(PREFIX + "rate.limiter.refresh.period.seconds")
.key("spark.operator.rate.limiter.refresh.period.seconds")
.description(
"Operator rate limiter refresh period(in seconds) for each resource.")
.enableDynamicOverride(false)
.typeParameterClass(Integer.class)
.defaultValue(15)
.build();
public static final ConfigOption<Integer> RateLimiterLimit = ConfigOption.<Integer>builder()
.key(PREFIX + "rate.limiter.limit")
.key("spark.operator.rate.limiter.limit")
.description(
"Max number of reconcile loops triggered within the rate limiter refresh " +
"period for each resource. Setting the limit <= 0 disables the " +
Expand All @@ -128,7 +127,7 @@ public class SparkOperatorConf {
.build();
public static final ConfigOption<Integer> RetryInitialInternalSeconds =
ConfigOption.<Integer>builder()
.key(PREFIX + "retry.initial.internal.seconds")
.key("spark.operator.retry.initial.internal.seconds")
.description(
"Initial interval(in seconds) of retries on unhandled controller " +
"errors.")
Expand All @@ -138,15 +137,15 @@ public class SparkOperatorConf {
.build();
public static final ConfigOption<Double> RetryInternalMultiplier =
ConfigOption.<Double>builder()
.key(PREFIX + "retry.internal.multiplier")
.key("spark.operator.retry.internal.multiplier")
.description("Interval multiplier of retries on unhandled controller errors.")
.enableDynamicOverride(false)
.typeParameterClass(Double.class)
.defaultValue(1.5)
.build();
public static final ConfigOption<Integer> RetryMaxIntervalSeconds =
ConfigOption.<Integer>builder()
.key(PREFIX + "retry.max.interval.seconds")
.key("spark.operator.retry.max.interval.seconds")
.description(
"Max interval(in seconds) of retries on unhandled controller errors. " +
"Set to -1 for unlimited.")
Expand All @@ -155,30 +154,30 @@ public class SparkOperatorConf {
.defaultValue(-1)
.build();
public static final ConfigOption<Integer> RetryMaxAttempts = ConfigOption.<Integer>builder()
.key(PREFIX + "retry.max.attempts")
.key("spark.operator.retry.max.attempts")
.description("Max attempts of retries on unhandled controller errors.")
.enableDynamicOverride(false)
.typeParameterClass(Integer.class)
.defaultValue(15)
.build();
public static final ConfigOption<Long> DriverCreateMaxAttempts = ConfigOption.<Long>builder()
.key(PREFIX + "driver.create.max.attempts")
.key("spark.operator.driver.create.max.attempts")
.description(
"Maximal number of retry attempts of requesting driver for Spark application.")
.defaultValue(3L)
.typeParameterClass(Long.class)
.build();
public static final ConfigOption<Long> MaxRetryAttemptOnKubeServerFailure =
ConfigOption.<Long>builder()
.key(PREFIX + "max.retry.attempts.on.k8s.failure")
.key("spark.operator.max.retry.attempts.on.k8s.failure")
.description(
"Maximal number of retry attempts of requests to k8s server upon " +
"response 429 and 5xx.")
.defaultValue(3L)
.typeParameterClass(Long.class)
.build();
public static final ConfigOption<Long> RetryAttemptAfterSeconds = ConfigOption.<Long>builder()
.key(PREFIX + "retry.attempt.after.seconds")
.key("spark.operator.retry.attempt.after.seconds")
.description(
"Default time (in seconds) to wait till next request. This would be used if " +
"server does not set Retry-After in response.")
Expand All @@ -187,13 +186,13 @@ public class SparkOperatorConf {
.build();
public static final ConfigOption<Long> MaxRetryAttemptAfterSeconds =
ConfigOption.<Long>builder()
.key(PREFIX + "max.retry.attempt.after.seconds")
.key("spark.operator.max.retry.attempt.after.seconds")
.description("Maximal time (in seconds) to wait till next request.")
.defaultValue(15L)
.typeParameterClass(Long.class)
.build();
public static final ConfigOption<Long> StatusPatchMaxRetry = ConfigOption.<Long>builder()
.key(PREFIX + "status.patch.max.retry")
.key("spark.operator.status.patch.max.retry")
.description(
"Maximal number of retry attempts of requests to k8s server for resource " +
"status update.")
Expand All @@ -202,7 +201,7 @@ public class SparkOperatorConf {
.build();
public static final ConfigOption<Long> StatusPatchFailureBackoffSeconds =
ConfigOption.<Long>builder()
.key(PREFIX + "status.patch.failure.backoff.seconds")
.key("spark.operator.status.patch.failure.backoff.seconds")
.description(
"Default time (in seconds) to wait till next request to patch " +
"resource status update.")
Expand All @@ -211,7 +210,7 @@ public class SparkOperatorConf {
.build();
public static final ConfigOption<Long> AppReconcileIntervalSeconds =
ConfigOption.<Long>builder()
.key(PREFIX + "app.reconcile.interval.seconds")
.key("spark.operator.app.reconcile.interval.seconds")
.description(
"Interval (in seconds) to reconcile when application is is starting " +
"up. Note that reconcile is always expected to be triggered " +
Expand All @@ -223,7 +222,7 @@ public class SparkOperatorConf {
.build();
public static final ConfigOption<Long> ForegroundRequestTimeoutSeconds =
ConfigOption.<Long>builder()
.key(PREFIX + "foreground.request.timeout.seconds")
.key("spark.operator.foreground.request.timeout.seconds")
.description(
"Timeout (in seconds) to for requests made to API server. this " +
"applies only to foreground requests.")
Expand All @@ -232,7 +231,7 @@ public class SparkOperatorConf {
.build();
public static final ConfigOption<String> OperatorWatchedNamespaces =
ConfigOption.<String>builder()
.key(PREFIX + "watched.namespaces")
.key("spark.operator.watched.namespaces")
.description(
"Comma-separated list of namespaces that the operator would be " +
"watching for Spark resources. If unset, operator would " +
Expand All @@ -242,7 +241,7 @@ public class SparkOperatorConf {
.build();
public static final ConfigOption<Boolean> TrimAttemptStateTransitionHistory =
ConfigOption.<Boolean>builder()
.key(PREFIX + "trim.attempt.state.transition.history")
.key("spark.operator.trim.attempt.state.transition.history")
.description(
"When enabled, operator would trim state transition history when a " +
"new attempt starts, keeping previous attempt summary only.")
Expand All @@ -251,7 +250,7 @@ public class SparkOperatorConf {
.build();

public static final ConfigOption<Boolean> JOSDKMetricsEnabled = ConfigOption.<Boolean>builder()
.key(PREFIX + "josdk.metrics.enabled")
.key("spark.operator.josdk.metrics.enabled")
.description(
"When enabled, the josdk metrics will be added in metrics source and " +
"configured for operator.")
Expand All @@ -260,7 +259,7 @@ public class SparkOperatorConf {

public static final ConfigOption<Boolean> KubernetesClientMetricsEnabled =
ConfigOption.<Boolean>builder()
.key(PREFIX + "kubernetes.client.metrics.enabled")
.key("spark.operator.kubernetes.client.metrics.enabled")
.defaultValue(true)
.description(
"Enable KubernetesClient metrics for measuring the HTTP traffic to " +
Expand All @@ -271,7 +270,7 @@ public class SparkOperatorConf {

public static final ConfigOption<Boolean>
KubernetesClientMetricsGroupByResponseCodeGroupEnabled = ConfigOption.<Boolean>builder()
.key(PREFIX + "kubernetes.client.metrics.group.by.response.code.group.enable")
.key("spark.operator.kubernetes.client.metrics.group.by.response.code.group.enable")
.description(
"When enabled, additional metrics group by http response code group(1xx, " +
"2xx, 3xx, 4xx, 5xx) received from API server will be added. Users " +
Expand All @@ -280,15 +279,15 @@ public class SparkOperatorConf {
.defaultValue(true)
.build();
public static final ConfigOption<Integer> OperatorProbePort = ConfigOption.<Integer>builder()
.key(PREFIX + "probe.port")
.key("spark.operator.probe.port")
.defaultValue(18080)
.description("The port used for health/readiness check probe status.")
.typeParameterClass(Integer.class)
.enableDynamicOverride(false)
.build();

public static final ConfigOption<Integer> OperatorMetricsPort = ConfigOption.<Integer>builder()
.key(PREFIX + "metrics.port")
.key("spark.operator.metrics.port")
.defaultValue(19090)
.description("The port used for checking metrics")
.typeParameterClass(Integer.class)
Expand All @@ -297,7 +296,7 @@ public class SparkOperatorConf {

public static final ConfigOption<Integer> SentinelExecutorServicePoolSize =
ConfigOption.<Integer>builder()
.key(PREFIX + "sentinel.executor.pool.size")
.key("spark.operator.sentinel.executor.pool.size")
.description(
"Size of executor service in Sentinel Managers to check the health " +
"of sentinel resources.")
Expand All @@ -308,7 +307,7 @@ public class SparkOperatorConf {

public static final ConfigOption<Long> SENTINEL_RESOURCE_RECONCILIATION_DELAY =
ConfigOption.<Long>builder()
.key(PREFIX + "health.sentinel.resource.reconciliation.delay.seconds")
.key("spark.operator.health.sentinel.resource.reconciliation.delay.seconds")
.defaultValue(60L)
.description(
"Allowed max time(seconds) between spec update and reconciliation " +
Expand All @@ -318,7 +317,7 @@ public class SparkOperatorConf {
.build();
public static final ConfigOption<String> APPLICATION_STATUS_LISTENER_CLASS_NAMES =
ConfigOption.<String>builder()
.key(PREFIX + "application.status.listener.class.names")
.key("spark.operator.application.status.listener.class.names")
.defaultValue("")
.description(
"Comma-separated names of ApplicationStatusListener class " +
Expand All @@ -328,7 +327,7 @@ public class SparkOperatorConf {
.build();
public static final ConfigOption<Boolean> LEADER_ELECTION_ENABLED =
ConfigOption.<Boolean>builder()
.key(PREFIX + "leader.election.enabled")
.key("spark.operator.leader.election.enabled")
.defaultValue(false)
.description(
"Enable leader election for the operator to allow running standby " +
Expand All @@ -338,7 +337,7 @@ public class SparkOperatorConf {
.build();
public static final ConfigOption<String> LEADER_ELECTION_LEASE_NAME =
ConfigOption.<String>builder()
.key(PREFIX + "leader.election.lease.name")
.key("spark.operator.leader.election.lease.name")
.defaultValue("spark-operator-lease")
.description(
"Leader election lease name, must be unique for leases in the same " +
Expand All @@ -348,23 +347,23 @@ public class SparkOperatorConf {
.build();
public static final ConfigOption<Long> LEADER_ELECTION_LEASE_DURATION_SECONDS =
ConfigOption.<Long>builder()
.key(PREFIX + "leader.election.lease.duration.seconds")
.key("spark.operator.leader.election.lease.duration.seconds")
.defaultValue(1200L)
.description("Leader election lease duration.")
.enableDynamicOverride(false)
.typeParameterClass(Long.class)
.build();
public static final ConfigOption<Long> LEADER_ELECTION_RENEW_DEADLINE_SECONDS =
ConfigOption.<Long>builder()
.key(PREFIX + "leader.election.renew.deadline.seconds")
.key("spark.operator.leader.election.renew.deadline.seconds")
.defaultValue(600L)
.description("Leader election renew deadline.")
.enableDynamicOverride(false)
.typeParameterClass(Long.class)
.build();
public static final ConfigOption<Long> LEADER_ELECTION_RETRY_PERIOD_SECONDS =
ConfigOption.<Long>builder()
.key(PREFIX + "leader.election.retry.period.seconds")
.key("spark.operator.leader.election.retry.period.seconds")
.defaultValue(180L)
.description("Leader election retry period.")
.enableDynamicOverride(false)
Expand Down