diff --git a/config/pmd/ruleset.xml b/config/pmd/ruleset.xml index 515783e5..3328463c 100644 --- a/config/pmd/ruleset.xml +++ b/config/pmd/ruleset.xml @@ -28,7 +28,6 @@ - @@ -37,7 +36,6 @@ - diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetrics.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetrics.java index 6eb88673..530848e8 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetrics.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetrics.java @@ -271,11 +271,11 @@ private long toSeconds(long startTimeInMilliseconds) { private Histogram getHistogram(Class kclass, String... names) { String name = MetricRegistry.name(kclass.getSimpleName(), names).toLowerCase(); Histogram histogram; - if (!histograms.containsKey(name)) { + if (histograms.containsKey(name)) { + histogram = histograms.get(name); + } else { histogram = metricRegistry.histogram(name); histograms.put(name, histogram); - } else { - histogram = histograms.get(name); } return histogram; } @@ -283,11 +283,11 @@ private Histogram getHistogram(Class kclass, String... names) { private Counter getCounter(Class klass, String... names) { String name = MetricRegistry.name(klass.getSimpleName(), names).toLowerCase(); Counter counter; - if (!counters.containsKey(name)) { + if (counters.containsKey(name)) { + counter = counters.get(name); + } else { counter = metricRegistry.counter(name); counters.put(name, counter); - } else { - counter = counters.get(name); } return counter; } diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/probe/ProbeService.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/probe/ProbeService.java index 48159f5b..a6c7bb1e 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/probe/ProbeService.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/probe/ProbeService.java @@ -41,14 +41,13 @@ public class ProbeService { public ProbeService( List operators, List> sentinelManagers, Executor executor) { - HealthProbe healthProbe = new HealthProbe(operators, sentinelManagers); try { this.server = HttpServer.create(new InetSocketAddress(OPERATOR_PROBE_PORT.getValue()), 0); } catch (IOException e) { throw new RuntimeException("Failed to create Probe Service Server", e); } server.createContext(READYZ, new ReadinessProbe(operators)); - server.createContext(HEALTHZ, healthProbe); + server.createContext(HEALTHZ, new HealthProbe(operators, sentinelManagers)); server.setExecutor(executor); } diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java index 0be8a6aa..3956935c 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java @@ -204,7 +204,6 @@ protected List getReconcileSteps(final SparkApplication app) { public DeleteControl cleanup( SparkApplication sparkApplication, Context context) { LoggingUtils.TrackedMDC trackedMDC = new LoggingUtils.TrackedMDC(); - DeleteControl deleteControl = DeleteControl.defaultDelete(); try { trackedMDC.set(sparkApplication); log.info("Cleaning up resources for SparkApp."); @@ -229,6 +228,6 @@ public DeleteControl cleanup( trackedMDC.reset(); } sparkAppStatusRecorder.removeCachedStatus(sparkApplication); - return deleteControl; + return DeleteControl.defaultDelete(); } } diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/observers/AppDriverTimeoutObserver.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/observers/AppDriverTimeoutObserver.java index a32968ac..0dda2826 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/observers/AppDriverTimeoutObserver.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/observers/AppDriverTimeoutObserver.java @@ -58,8 +58,6 @@ public class AppDriverTimeoutObserver extends BaseAppDriverObserver { @Override public Optional observe( Pod driver, ApplicationSpec spec, ApplicationStatus currentStatus) { - Instant lastTransitionTime = - Instant.parse(currentStatus.getCurrentState().getLastTransitionTime()); long timeoutThreshold; Supplier supplier; ApplicationTimeoutConfig timeoutConfig = @@ -82,6 +80,8 @@ public Optional observe( // No timeout check needed for other states return Optional.empty(); } + Instant lastTransitionTime = + Instant.parse(currentStatus.getCurrentState().getLastTransitionTime()); if (timeoutThreshold > 0L && lastTransitionTime.plusMillis(timeoutThreshold).isBefore(Instant.now())) { ApplicationState state = supplier.get(); diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java index 0c529a96..6d614ece 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java @@ -76,7 +76,10 @@ public ReconcileProgress reconcile( } } } - if (!proposedStateSummary.equals(prevStateSummary)) { + if (proposedStateSummary.equals(prevStateSummary)) { + return observeDriver( + context, statusRecorder, Collections.singletonList(new AppDriverRunningObserver())); + } else { statusRecorder.persistStatus( context, context @@ -84,9 +87,6 @@ public ReconcileProgress reconcile( .getStatus() .appendNewState(new ApplicationState(proposedStateSummary, stateMessage))); return completeAndDefaultRequeue(); - } else { - return observeDriver( - context, statusRecorder, Collections.singletonList(new AppDriverRunningObserver())); } } } diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java index ca25833b..176bbcc0 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/ReconcilerUtils.java @@ -87,9 +87,7 @@ public static Optional getOrCreateSecondaryResource( maxAttempts); } // retry only on 409 Conflict - if (e.getCode() != 409) { - throw e; - } else { + if (e.getCode() == 409) { if (isConflictForExistingResource(e)) { current = getResource(client, resource); if (current.isPresent()) { @@ -100,6 +98,8 @@ public static Optional getOrCreateSecondaryResource( log.error("Max Retries exceeded while trying to create resource"); throw e; } + } else { + throw e; } } } @@ -147,10 +147,10 @@ public static void deleteResourceIfExists( .delete(); } } catch (KubernetesClientException e) { - if (e.getCode() != 404) { - throw e; - } else { + if (e.getCode() == 404) { log.info("Pod to delete does not exist, proceeding..."); + } else { + throw e; } } } diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/StatusRecorder.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/StatusRecorder.java index 42c07c1f..7aa7c428 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/StatusRecorder.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/StatusRecorder.java @@ -88,8 +88,6 @@ private void patchAndStatusWithVersionLocked(CR resource, KubernetesClient clien return; } - STATUS prevStatus = objectMapper.convertValue(previousStatusNode, statusClass); - Exception err = null; long maxRetry = API_STATUS_PATCH_MAX_ATTEMPTS.getValue(); for (long i = 0; i < maxRetry; i++) { @@ -110,6 +108,7 @@ private void patchAndStatusWithVersionLocked(CR resource, KubernetesClient clien } statusCache.put(resourceId, newStatusNode); + STATUS prevStatus = objectMapper.convertValue(previousStatusNode, statusClass); statusListeners.forEach( listener -> { listener.listenStatus(resource, prevStatus, resource.getStatus());