Skip to content

Add coordinator side hash joins to analytics engine#21480

Open
mch2 wants to merge 1 commit intoopensearch-project:mainfrom
mch2:pr-joins
Open

Add coordinator side hash joins to analytics engine#21480
mch2 wants to merge 1 commit intoopensearch-project:mainfrom
mch2:pr-joins

Conversation

@mch2
Copy link
Copy Markdown
Member

@mch2 mch2 commented May 5, 2026

Description

Summary

Brings PPL join coverage (inner, left outer, cross, lookup, lookup+rename) online through the analytics-engine route, plus fixes uncovered during the merge from main.

  • JoinCommandIT — 6 ITs covering inner, left outer, cross, lookup, lookup+rename (all green). appendcol skipped with @AwaitsFix — depends on window-function capability track (separate PR).
  • Cross join markingOpenSearchJoinRule.matches relaxed from !leftKeys.isEmpty() to info.isEqui().
  • CAST project capability — added Scalar(CAST, ...) to DataFusionAnalyticsBackendPlugin.projectCapabilities(). LEFT OUTER JOIN injects CAST for nullable coercion; without this the project rule rejected the expression.
  • Substrait names-list emission fix (general correctness — not just joins) — DataFusionFragmentConvertor.rewire(Plan, Rel) previously reused innerRoot.getNames() for the wrapper's output names. When the wrapper's output schema differs from the inner plan's (e.g. Aggregate over Join shrinks 4 columns → 1), this emitted a mismatched Plan.Root.names list and DataFusion rejected with "Names list must match exactly to nested schema, but found N uses for M names". Fixed
    by taking the wrapper's own names.
  • RowProducingSink.close() no longer drops buffered batchesclose() used to release and clear the buffer. ShardFragmentStageExecution.onShardTerminated invokes close() before the SUCCEEDED transition; the PlanWalker's completion listener reads results during that transition → saw 0 rows. Terminal root sink is now a no-op on close; the downstream consumer (DefaultPlanExecutor#batchesToRows) already owns per-batch cleanup. Error path keeps leak safety via new
    releaseUnread().

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

PR Reviewer Guide 🔍

(Review updated until commit b88d249)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
📝 TODO sections

🔀 Multiple PR themes

Sub-PR theme: Planner-side OpenSearchJoin rel and rule

Relevant files:

  • sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/rel/OpenSearchJoin.java
  • sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/rules/OpenSearchJoinRule.java
  • sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/planner/JoinRuleTests.java
  • sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/EngineCapability.java
  • sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/RelNodeUtils.java

Sub-PR theme: DAG builder and fragment conversion for multi-input joins

Relevant files:

  • sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/DAGBuilder.java
  • sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/MultiInputShape.java
  • sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/FragmentConversionDriver.java
  • sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/PlanForker.java
  • sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/planner/dag/MultiInputShapeTests.java
  • sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionFragmentConvertor.java
  • sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/DataFusionFragmentConvertorTests.java

Sub-PR theme: Integration and end-to-end tests for coordinator hash joins

Relevant files:

  • sandbox/plugins/analytics-backend-datafusion/src/internalClusterTest/java/org/opensearch/be/datafusion/CoordinatorJoinIT.java
  • sandbox/plugins/analytics-backend-datafusion/src/internalClusterTest/java/org/opensearch/be/datafusion/CoordinatorJoinMultiNodeIT.java
  • sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/JoinCommandIT.java
  • sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/exec/RowProducingSinkTests.java

⚡ Recommended focus areas for review

Error Handling

In onMatch, when viableBackends is empty after intersecting with join-capable backends, an IllegalStateException is thrown. This will surface as an unhandled planner exception rather than a graceful "unsupported" fallback. Consider returning without transforming (letting the LogicalJoin survive) so the planner can report a cleaner "no plan found" error, consistent with how testPureNonEquiJoinDoesNotMatch handles the non-equi case.

if (viableBackends.isEmpty()) {
    throw new IllegalStateException("No backend supports JOIN among viable backends after intersecting inputs");
}
Null Sink Provider

In cutSingleton, when grandchildren is non-empty (intermediate coordinator stage), childSinkProvider is resolved via reduceViable.getFirst(). If reduceViable is empty (no backend supports reduce capability), getFirst() will throw a NoSuchElementException with no contextual error message. A guard with a descriptive exception should be added before calling getFirst().

ExchangeSinkProvider childSinkProvider = null;
if (!grandchildren.isEmpty()) {
    List<String> reduceViable = CapabilityResolutionUtils.filterByReduceCapability(registry, reducer.getViableBackends());
    childSinkProvider = registry.getBackend(reduceViable.getFirst()).getExchangeSinkProvider();
}
Silent Skip

In crossProduct, when a child alternative's backend disagrees with agreedBackend, the alternative is silently skipped via continue. If all alternatives for a child are skipped (no agreement), the handler is never called and results remains empty — this is not detected until the caller checks for empty results. Consider adding a post-loop check that at least one combo was produced, or propagating a clearer error when no valid cross-product combination exists.

    if (agreedBackend != null && childContributesBackend && !agreedBackend.equals(alt.chosenBackend)) {
        continue;
    }
    String next = agreedBackend != null ? agreedBackend : (childContributesBackend ? alt.chosenBackend : null);
    partial.add(alt.node);
    crossProduct(childAlternativeSets, depth + 1, partial, next, handler);
    partial.removeLast();
}
Regression Risk

The derived-column filter path now throws IllegalStateException unconditionally instead of falling back to filterBackendsAnyFormat. This is a deliberate behavior change (fail-fast), but it means any existing query with a HAVING clause or filter on a post-aggregate/post-project column will now fail at planning time rather than executing. Ensure all callers that previously relied on the fallback path are accounted for, and that the test suite covers the regression boundary.

throw new IllegalStateException(
    "filter on derived column ["
        + storageInfo.getFieldName()
        + "] not supported: "
        + "delegation model for derived columns is not yet implemented"
);
Static Provisioning Flag

dataProvisioned is a static boolean field, which means it persists across test class instances within the same JVM. If the test suite is run in a mode where the static state is not reset between test runs (e.g., test retries or suite re-execution), provisioning will be skipped on subsequent runs even if the cluster was restarted. Consider using a @BeforeClass-equivalent or a per-suite setup mechanism instead.

private static boolean dataProvisioned = false;

/**
 * Lazily provision both calcs indices on first invocation. Called inside test
 * methods — {@code client()} is not available in {@code @BeforeClass}.
 */
private void ensureDataProvisioned() throws IOException {
    if (dataProvisioned == false) {
        DatasetProvisioner.provision(client(), CALCS);
        DatasetProvisioner.provision(client(), CALCS_ALT);
        dataProvisioned = true;
    }
}

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

PR Code Suggestions ✨

Latest suggestions up to b88d249

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Guard against empty reduce-capable backend list

When grandchildren is non-empty, reduceViable could be empty if no backend supports
the reduce capability, causing reduceViable.getFirst() to throw a
NoSuchElementException. Add a guard to throw a meaningful IllegalStateException when
no reduce-capable backend is found.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/DAGBuilder.java [118-123]

 TargetResolver targetResolver = grandchildren.isEmpty() ? new ShardTargetResolver(childFragment, clusterService) : null;
 ExchangeSinkProvider childSinkProvider = null;
 if (!grandchildren.isEmpty()) {
     List<String> reduceViable = CapabilityResolutionUtils.filterByReduceCapability(registry, reducer.getViableBackends());
+    if (reduceViable.isEmpty()) {
+        throw new IllegalStateException("No reduce-capable backend found among viable backends: " + reducer.getViableBackends());
+    }
     childSinkProvider = registry.getBackend(reduceViable.getFirst()).getExchangeSinkProvider();
 }
Suggestion importance[1-10]: 7

__

Why: When reduceViable is empty, reduceViable.getFirst() throws a NoSuchElementException with no context. Adding an explicit guard with a meaningful IllegalStateException improves debuggability and is a real correctness concern for multi-backend scenarios.

Medium
Avoid throwing inside onMatch for unplannable joins

When computeViableBackends returns an empty list (e.g. because one or both inputs
are not yet marked as OpenSearchRelNode), retainAll silently produces an empty list
and the rule throws. However, the HEP planner expects matches() to guard against
unplannable cases — throwing inside onMatch can abort the entire planning session.
Consider returning early (not transforming) instead of throwing when no viable
backends exist, so the planner can try other rules or report a cleaner error.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/rules/OpenSearchJoinRule.java [98-103]

 List<String> viableBackends = computeViableBackends(join.getLeft(), join.getRight());
 List<String> joinCapable = context.getCapabilityRegistry().operatorBackends(EngineCapability.JOIN);
 viableBackends.retainAll(joinCapable);
 if (viableBackends.isEmpty()) {
-    throw new IllegalStateException("No backend supports JOIN among viable backends after intersecting inputs");
+    // No viable backend — do not transform; let the planner report a planning failure.
+    return;
 }
Suggestion importance[1-10]: 5

__

Why: Throwing inside onMatch can abort the HEP planning session rather than allowing the planner to try other rules or report a cleaner error. Returning early is a safer pattern, though in practice the matches() guard should prevent most unplannable cases from reaching onMatch.

Low
Prevent double-close of batches in error path

releaseUnread() does not guard against being called after readResult() has already
transferred ownership of the batches to the consumer. If the consumer closes the
batches and then releaseUnread() is called (e.g. due to a race on the error path),
each batch would be closed twice, causing a double-free. Consider tracking whether
ownership has been transferred and making releaseUnread() a no-op in that case.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/RowProducingSink.java [112-116]

 public synchronized void releaseUnread() {
+    if (batches.isEmpty()) {
+        return; // already drained by readResult() or previously released
+    }
     for (VectorSchemaRoot batch : batches) {
         batch.close();
     }
     batches.clear();
 }
Suggestion importance[1-10]: 4

__

Why: The early-return on batches.isEmpty() is a reasonable guard, but it doesn't fully prevent double-close if readResult() transfers ownership without clearing batches. The suggestion is partially correct but doesn't address the root cause of the potential race; however, it does reduce the risk of double-free in common cases.

Low
General
Detect and report empty cross-product plan alternatives

If crossProduct produces no valid combinations (all alternatives were pruned by the
backend-agreement filter), results will be empty and the caller will silently
receive no plan alternatives, which may cause a downstream NoSuchElementException or
silent planning failure. Add a check after crossProduct to throw a meaningful error
when no compatible backend combination was found.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/PlanForker.java [91-94]

 crossProduct(childAlternativeSets, 0, new ArrayList<>(), null, (chosenBackend, combo) -> {
     results.addAll(resolveOperator(node, combo, chosenBackend));
 });
+if (results.isEmpty()) {
+    throw new IllegalStateException(
+        "Multi-input operator [" + node.getClass().getSimpleName() + "] produced no plan alternatives: "
+        + "no combination of child alternatives agrees on a single backend"
+    );
+}
 return results;
Suggestion importance[1-10]: 5

__

Why: If all cross-product combinations are pruned by the backend-agreement filter, results will be empty and the caller may silently fail or throw a less informative error downstream. Adding an explicit check here improves diagnosability.

Low

Previous suggestions

Suggestions up to commit 1ad9dc0
CategorySuggestion                                                                                                                                    Impact
Possible issue
Unwrap HepRelVertex before instanceof checks in detection

The detection logic does not unwrap HepRelVertex wrappers before checking instanceof
OpenSearchExchangeReducer. In a HEP planner context, inputs are always wrapped in
HepRelVertex, so the check will always fail and detect() will always return
Optional.empty(), making isFinalAggBoundary never return true for multi-input
shapes. Add HEP unwrapping before the instanceof checks.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/MultiInputShape.java [49-65]

 public static Optional<MultiInputShape> detect(RelNode node) {
     List<RelNode> inputs = node.getInputs();
     if (inputs.size() < 2) {
         return Optional.empty();
     }
     List<OpenSearchStageInputScan> scans = new ArrayList<>(inputs.size());
     for (RelNode input : inputs) {
-        if (!(input instanceof OpenSearchExchangeReducer reducer)) {
+        RelNode unwrappedInput = input instanceof org.apache.calcite.plan.hep.HepRelVertex v ? v.getCurrentRel() : input;
+        if (!(unwrappedInput instanceof OpenSearchExchangeReducer reducer)) {
             return Optional.empty();
         }
-        if (!(reducer.getInput() instanceof OpenSearchStageInputScan scan)) {
+        RelNode reducerInput = reducer.getInput() instanceof org.apache.calcite.plan.hep.HepRelVertex v2 ? v2.getCurrentRel() : reducer.getInput();
+        if (!(reducerInput instanceof OpenSearchStageInputScan scan)) {
             return Optional.empty();
         }
         scans.add(scan);
     }
     return Optional.of(new MultiInputShape(scans));
 }
Suggestion importance[1-10]: 8

__

Why: In a HEP planner context, node.getInputs() returns HepRelVertex-wrapped nodes, so the instanceof OpenSearchExchangeReducer check will always fail without unwrapping. This would make isFinalAggBoundary never return true for multi-input shapes (Union, Join), breaking the entire coordinator-fragment conversion path for joins.

Medium
Guard against empty reduce-capable backends list

When grandchildren is non-empty but reduceViable is empty (no backend supports
reduce capability), reduceViable.getFirst() will throw a NoSuchElementException,
leaving the stage in an inconsistent state with no meaningful error message. Add a
guard to throw an IllegalStateException with a descriptive message before calling
getFirst().

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/DAGBuilder.java [118-123]

 TargetResolver targetResolver = grandchildren.isEmpty() ? new ShardTargetResolver(childFragment, clusterService) : null;
 ExchangeSinkProvider childSinkProvider = null;
 if (!grandchildren.isEmpty()) {
     List<String> reduceViable = CapabilityResolutionUtils.filterByReduceCapability(registry, reducer.getViableBackends());
+    if (reduceViable.isEmpty()) {
+        throw new IllegalStateException(
+            "No backend supports reduce capability among viable backends: " + reducer.getViableBackends()
+        );
+    }
     childSinkProvider = registry.getBackend(reduceViable.getFirst()).getExchangeSinkProvider();
 }
Suggestion importance[1-10]: 7

__

Why: When reduceViable is empty, getFirst() throws a NoSuchElementException with no context. Adding an explicit guard with a descriptive IllegalStateException improves debuggability significantly for a realistic failure mode.

Medium
Avoid throwing in rule match when no viable backends exist

When computeViableBackends returns an empty list (e.g. the inputs haven't been
marked yet and viableBackendsOf returns List.of()), the intersection is empty and
the rule throws, preventing the planner from even attempting to plan the join. The
rule should return early (not match) instead of throwing, so the planner can try
other rules or report a proper planning failure.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/rules/OpenSearchJoinRule.java [101-103]

 if (viableBackends.isEmpty()) {
-    throw new IllegalStateException("No backend supports JOIN among viable backends after intersecting inputs");
+    return; // no backend supports JOIN — let the planner report a proper failure
 }
Suggestion importance[1-10]: 6

__

Why: Throwing inside onMatch (which is only called after matches() returns true) is unusual and can cause the planner to abort rather than try other rules. Returning early is the conventional HEP rule pattern for "can't apply", though the practical impact depends on whether other join rules exist.

Low
General
Fix non-thread-safe static provisioning flag

The dataProvisioned static flag is not thread-safe. If tests run concurrently,
multiple threads could pass the if check simultaneously and provision the datasets
multiple times, potentially causing index-already-exists errors. Use a volatile flag
or synchronize the provisioning block.

sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/JoinCommandIT.java [51-63]

-private static boolean dataProvisioned = false;
+private static volatile boolean dataProvisioned = false;
 
-/**
- * Lazily provision both calcs indices on first invocation. Called inside test
- * methods — {@code client()} is not available in {@code @BeforeClass}.
- */
-private void ensureDataProvisioned() throws IOException {
+private synchronized void ensureDataProvisioned() throws IOException {
     if (dataProvisioned == false) {
         DatasetProvisioner.provision(client(), CALCS);
         DatasetProvisioner.provision(client(), CALCS_ALT);
         dataProvisioned = true;
     }
 }
Suggestion importance[1-10]: 4

__

Why: The dataProvisioned flag is not volatile or synchronized, which could cause double-provisioning under concurrent test execution. However, REST integration tests typically run sequentially, so the practical risk is low.

Low
Suggestions up to commit cc7e347
CategorySuggestion                                                                                                                                    Impact
Possible issue
Unwrap HepRelVertex before instanceof checks

The detect method does not unwrap HepRelVertex wrappers before checking if an input
is an OpenSearchExchangeReducer. In a HEP planner context, inputs are typically
wrapped in HepRelVertex, so the instanceof check will always fail and detect will
always return Optional.empty(), breaking the multi-input shape detection for Union
and Join coordinator fragments.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/MultiInputShape.java [49-65]

 public static Optional<MultiInputShape> detect(RelNode node) {
     List<RelNode> inputs = node.getInputs();
     if (inputs.size() < 2) {
         return Optional.empty();
     }
     List<OpenSearchStageInputScan> scans = new ArrayList<>(inputs.size());
     for (RelNode input : inputs) {
-        if (!(input instanceof OpenSearchExchangeReducer reducer)) {
+        RelNode unwrapped = input instanceof HepRelVertex v ? v.getCurrentRel() : input;
+        if (!(unwrapped instanceof OpenSearchExchangeReducer reducer)) {
             return Optional.empty();
         }
-        if (!(reducer.getInput() instanceof OpenSearchStageInputScan scan)) {
+        RelNode reducerInput = reducer.getInput() instanceof HepRelVertex v ? v.getCurrentRel() : reducer.getInput();
+        if (!(reducerInput instanceof OpenSearchStageInputScan scan)) {
             return Optional.empty();
         }
         scans.add(scan);
     }
     return Optional.of(new MultiInputShape(scans));
 }
Suggestion importance[1-10]: 7

__

Why: This is a valid concern — MultiInputShape.detect is called from FragmentConversionDriver which operates on the post-planning tree (Volcano output), not HEP. However, if the DAG builder operates on the Volcano output where HepRelVertex wrappers are absent, the issue may not manifest. The suggestion is still worth considering as a defensive measure for correctness.

Medium
Guard against empty reduce-capable backends list

When grandchildren is non-empty, reduceViable could be empty if no backend supports
the reduce capability, causing reduceViable.getFirst() to throw a
NoSuchElementException. Add a guard to throw a meaningful error when no viable
reduce backend is found.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/DAGBuilder.java [118-123]

 TargetResolver targetResolver = grandchildren.isEmpty() ? new ShardTargetResolver(childFragment, clusterService) : null;
 ExchangeSinkProvider childSinkProvider = null;
 if (!grandchildren.isEmpty()) {
     List<String> reduceViable = CapabilityResolutionUtils.filterByReduceCapability(registry, reducer.getViableBackends());
+    if (reduceViable.isEmpty()) {
+        throw new IllegalStateException("No backend supports reduce capability among viable backends: " + reducer.getViableBackends());
+    }
     childSinkProvider = registry.getBackend(reduceViable.getFirst()).getExchangeSinkProvider();
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that reduceViable.getFirst() can throw a NoSuchElementException when no backend supports reduce capability. Adding a guard with a meaningful error message improves debuggability and prevents cryptic failures.

Low
Avoid throwing when join inputs are not yet marked

When computeViableBackends returns an empty list (e.g. inputs are not yet marked as
OpenSearchRelNode), retainAll on an empty list still produces an empty list and the
rule throws. However, the real issue is that the rule should return false from
matches() rather than throw during onMatch() when inputs are not yet in
OpenSearchConvention, to allow the planner to retry later rather than abort planning
entirely.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/rules/OpenSearchJoinRule.java [98-103]

 List<String> viableBackends = computeViableBackends(join.getLeft(), join.getRight());
+if (viableBackends.isEmpty()) {
+    return; // inputs not yet marked — let HEP retry after child rules fire
+}
 List<String> joinCapable = context.getCapabilityRegistry().operatorBackends(EngineCapability.JOIN);
 viableBackends.retainAll(joinCapable);
 if (viableBackends.isEmpty()) {
     throw new IllegalStateException("No backend supports JOIN among viable backends after intersecting inputs");
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion has merit in that an empty viableBackends from computeViableBackends could indicate unmarked inputs, but the HEP bottom-up traversal guarantees children are already marked before this rule fires. Silently returning from onMatch could hide real configuration errors; the existing throw is more appropriate for the actual case where no backend supports JOIN.

Low
General
Fix non-thread-safe static provisioning flag

The dataProvisioned static flag is not thread-safe and is never reset between test
class runs. If tests run in parallel or the test class is reused across JVM forks,
provisioning may be skipped or partially applied. Use a @BeforeClass-equivalent or
synchronize access to the flag to ensure correct provisioning.

sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/JoinCommandIT.java [51-63]

-private static boolean dataProvisioned = false;
+private static volatile boolean dataProvisioned = false;
 
-private void ensureDataProvisioned() throws IOException {
+private synchronized void ensureDataProvisioned() throws IOException {
     if (dataProvisioned == false) {
         DatasetProvisioner.provision(client(), CALCS);
         DatasetProvisioner.provision(client(), CALCS_ALT);
         dataProvisioned = true;
     }
 }
Suggestion importance[1-10]: 3

__

Why: The thread-safety concern is valid but integration tests typically run sequentially in a single thread. The synchronized + volatile fix is a minor improvement for correctness in parallel test execution scenarios, but has low practical impact for standard test runs.

Low
Suggestions up to commit be06fac
CategorySuggestion                                                                                                                                    Impact
Possible issue
Guard against empty viable backends list

When reduceViable is empty (no backend supports the reduce capability for the given
viable backends), reduceViable.getFirst() will throw a NoSuchElementException with
no context. This should be guarded with an explicit check and a meaningful error
message to help diagnose misconfigured capability registries.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/DAGBuilder.java [118-123]

 TargetResolver targetResolver = grandchildren.isEmpty() ? new ShardTargetResolver(childFragment, clusterService) : null;
 ExchangeSinkProvider childSinkProvider = null;
 if (!grandchildren.isEmpty()) {
     List<String> reduceViable = CapabilityResolutionUtils.filterByReduceCapability(registry, reducer.getViableBackends());
+    if (reduceViable.isEmpty()) {
+        throw new IllegalStateException(
+            "No backend supports reduce capability for stage "
+                + childStageId
+                + "; viable backends were: "
+                + reducer.getViableBackends()
+        );
+    }
     childSinkProvider = registry.getBackend(reduceViable.getFirst()).getExchangeSinkProvider();
 }
Suggestion importance[1-10]: 7

__

Why: The reduceViable.getFirst() call will throw a NoSuchElementException with no context if reduceViable is empty. Adding an explicit guard with a meaningful error message is a valid defensive improvement that aids debugging of misconfigured capability registries.

Medium
Fix non-atomic idempotency check in ChildSink close

The childClosed flag is checked and set non-atomically, creating a TOCTOU race
condition. If two threads call close() concurrently, both may read false before
either sets true, causing sender.close() to be called twice. Use an AtomicBoolean
with compareAndSet to make the idempotency guarantee thread-safe.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionReduceSink.java [213]

-private volatile boolean childClosed;
+private final java.util.concurrent.atomic.AtomicBoolean childClosed = new java.util.concurrent.atomic.AtomicBoolean(false);
Suggestion importance[1-10]: 6

__

Why: The childClosed volatile flag has a TOCTOU race if two threads call close() concurrently, but in practice ChildSink.close() is called by the orchestrator's single drain/close path, making concurrent calls unlikely. The suggestion is valid but the real-world impact is limited.

Low
Broaden exception catch to prevent stage wedging

Catching only IllegalStateException is too narrow — outputSink.feed(vsr) could throw
other runtime exceptions (e.g., ArrowException, NullPointerException) that should
also be handled to prevent the inFlight counter from never reaching zero and wedging
the stage. Additionally, when an exception other than a "sink closed"
IllegalStateException is caught, the failure should be captured so the stage
transitions to FAILED rather than silently dropping the error.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/stage/ShardFragmentStageExecution.java [119-127]

-} catch (IllegalStateException closed) {
-    // QueryContext closed while this response was in flight — another
-    // shard failed and tore down teardown. Silently drop; onShardTerminated
-    // still needs to run so inFlight drains and the stage's terminal
-    // transition isn't wedged.
+} catch (Exception ex) {
+    // Capture unexpected errors so the stage transitions to FAILED.
+    // For the "sink already closed" case (QueryContext torn down) we
+    // still need onShardTerminated() to drain inFlight.
+    captureFailure(ex);
     if (isLast) {
         onShardTerminated();
     }
 }
Suggestion importance[1-10]: 6

__

Why: Catching only IllegalStateException is indeed too narrow — other runtime exceptions from outputSink.feed(vsr) could leave inFlight undrainable and wedge the stage. Broadening to Exception and capturing the failure is a valid correctness improvement, though the improved_code changes the semantics slightly by always capturing the failure rather than distinguishing the "closed sink" case.

Low
Ensure sender closed exactly once under concurrency

With the AtomicBoolean fix, the close() method should use compareAndSet(false, true)
to atomically transition from open to closed, ensuring sender.close() is called
exactly once even under concurrent invocations.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionReduceSink.java [225-235]

 @Override
 public void close() {
-    if (childClosed) {
+    if (!childClosed.compareAndSet(false, true)) {
         return;
     }
-    childClosed = true;
     try {
         sender.close();
     } catch (Throwable t) {
         logger.warn("[ReduceSink] error closing child sender", t);
     }
 }
Suggestion importance[1-10]: 5

__

Why: This is a follow-on to suggestion 1 and correctly shows how compareAndSet would be used with an AtomicBoolean. However, it depends on the first suggestion being applied and the standalone impact is moderate since concurrent close() calls on ChildSink are unlikely in the described lifecycle.

Low
Fail fast on unexpected reducer input type

The detect method checks that every child of the node is an
OpenSearchExchangeReducer wrapping an OpenSearchStageInputScan, but after
DAGBuilder.cutSingleton is called with the new recursive sever(), the
OpenSearchExchangeReducer's input is replaced with a StageInputScan placeholder only
after the stage is cut. During FragmentConversionDriver.isFinalAggBoundary, the tree
has already been severed, so the inner input of each reducer should indeed be a
StageInputScan. However, if reducer.getInput() returns something other than
OpenSearchStageInputScan (e.g., a nested plan that wasn't fully severed), detect
silently returns Optional.empty() instead of failing fast, which could cause the
wrong conversion path to be taken. Consider adding a warning log or assertion when a
reducer's input is unexpectedly not a StageInputScan to surface misconfigured plans
early.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/MultiInputShape.java [49-65]

 public static Optional<MultiInputShape> detect(RelNode node) {
     List<RelNode> inputs = node.getInputs();
     if (inputs.size() < 2) {
         return Optional.empty();
     }
     List<OpenSearchStageInputScan> scans = new ArrayList<>(inputs.size());
     for (RelNode input : inputs) {
         if (!(input instanceof OpenSearchExchangeReducer reducer)) {
             return Optional.empty();
         }
-        if (!(reducer.getInput() instanceof OpenSearchStageInputScan scan)) {
-            return Optional.empty();
+        RelNode reducerInput = reducer.getInput();
+        if (!(reducerInput instanceof OpenSearchStageInputScan scan)) {
+            // Reducer's input was not severed to a StageInputScan — this indicates
+            // an incomplete DAG cut. Fail fast rather than silently falling through
+            // to the wrong conversion path.
+            throw new IllegalStateException(
+                "Expected OpenSearchStageInputScan under ExchangeReducer in multi-input shape, got: "
+                    + reducerInput.getClass().getSimpleName()
+            );
         }
         scans.add(scan);
     }
     return Optional.of(new MultiInputShape(scans));
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion proposes throwing an exception when a reducer's input is not a StageInputScan, but the current silent Optional.empty() return is intentional — detect is designed to return empty when the shape doesn't match, not to fail. Changing this to throw would break callers that legitimately encounter reducers with non-StageInputScan inputs during intermediate planning phases.

Low
General
Defensively copy mutable list in constructor

The viableBackends list is stored directly without defensive copying. If the caller
mutates the list after construction (e.g. retainAll in OpenSearchUnionRule), the
stored reference will reflect those mutations. Wrap it in an unmodifiable copy to
preserve immutability.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/rel/OpenSearchUnion.java [43-46]

 public OpenSearchUnion(RelOptCluster cluster, RelTraitSet traitSet, List<RelNode> inputs, boolean all, List<String> viableBackends) {
     super(cluster, traitSet, List.of(), inputs, all);
-    this.viableBackends = viableBackends;
+    this.viableBackends = List.copyOf(viableBackends);
 }
Suggestion importance[1-10]: 5

__

Why: The viableBackends list passed to the constructor could be mutated by the caller after construction (e.g., retainAll in OpenSearchUnionRule). Using List.copyOf is a valid defensive practice to ensure immutability of the stored reference.

Low
Ensure join rule fires after filter pushdown

OpenSearchJoinRule is added before OpenSearchSortRule and OpenSearchUnionRule, but
join planning typically depends on filter and project pushdown having already been
applied to the join inputs. If OpenSearchJoinRule fires before OpenSearchFilterRule
has had a chance to push filters into the join branches, the join's viable backends
may be over-estimated. Verify that the HEP planner's rule ordering here matches the
intended optimization sequence — specifically that filter/project rules fire before
the join rule.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/PlannerImpl.java [102-108]

 new OpenSearchFilterRule(context),
 new OpenSearchProjectRule(context),
 new OpenSearchAggregateRule(context),
-new OpenSearchJoinRule(context),
 new OpenSearchSortRule(context),
-new OpenSearchUnionRule(context)
+new OpenSearchUnionRule(context),
+new OpenSearchJoinRule(context)
Suggestion importance[1-10]: 4

__

Why: The suggestion raises a valid concern about rule ordering in HEP planners, but HEP planners apply rules in a fixed sequence and the interaction between OpenSearchJoinRule and OpenSearchFilterRule depends on the specific planner configuration. Without more context on how the HEP planner is configured (e.g., HepMatchOrder), this is speculative and may not reflect an actual issue.

Low
Fix race condition in lazy data provisioning flag

The dataProvisioned static flag is not thread-safe. If test methods run
concurrently, multiple threads may read false simultaneously and each attempt to
provision the dataset, causing duplicate provisioning failures. Use a static
volatile or synchronize the check-and-set block to prevent concurrent provisioning.

sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/AppendCommandIT.java [57-71]

-private static boolean dataProvisioned = false;
+private static volatile boolean dataProvisioned = false;
 
-private void ensureDataProvisioned() throws IOException {
+private synchronized void ensureDataProvisioned() throws IOException {
     if (dataProvisioned == false) {
         DatasetProvisioner.provision(client(), CALCS);
         DatasetProvisioner.provision(client(), CALCS_ALT);
         dataProvisioned = true;
     }
 }
Suggestion importance[1-10]: 3

__

Why: Integration test methods in OpenSearch's test framework typically run sequentially within a class, making concurrent provisioning unlikely. Adding volatile and synchronized is a minor defensive improvement but has low practical impact in this context.

Low
Suggestions up to commit ad66195
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix race condition in ChildSink close idempotency check

The childClosed field is declared volatile boolean and checked with a non-atomic
read-then-write pattern, creating a race condition where two concurrent threads can
both pass the if (childClosed) check before either sets it to true. This could cause
sender.close() to be called twice concurrently. Use an AtomicBoolean with
compareAndSet to make the idempotency guarantee thread-safe.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionReduceSink.java [225-235]

 private final class ChildSink implements ExchangeSink {
     private final DatafusionPartitionSender sender;
-    private volatile boolean childClosed;
+    private final java.util.concurrent.atomic.AtomicBoolean childClosed = new java.util.concurrent.atomic.AtomicBoolean(false);
     ...
     @Override
     public void close() {
-        if (childClosed) {
+        if (!childClosed.compareAndSet(false, true)) {
             return;
         }
-        childClosed = true;
Suggestion importance[1-10]: 7

__

Why: The volatile boolean childClosed with a non-atomic read-then-write pattern is a genuine race condition — two concurrent threads can both pass the if (childClosed) check before either sets it to true, causing sender.close() to be called twice. Using AtomicBoolean.compareAndSet would fix this correctly.

Medium
Fix asymmetric null backend handling in multi-input resolution

When agreedBackend is set from the first child and that first child's chosenBackend
is null or empty, subsequent children with a non-null backend will not match
agreedBackend (which is null), causing a false backend disagreement error. The
null/empty check should be applied symmetrically when setting agreedBackend from the
first child as well.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/PlanForker.java [92-96]

-if (agreedBackend == null) {
+if (agreedBackend == null && childAlt.chosenBackend != null && !childAlt.chosenBackend.isEmpty()) {
     agreedBackend = childAlt.chosenBackend;
-} else if (childAlt.chosenBackend != null
+} else if (agreedBackend != null
+    && childAlt.chosenBackend != null
     && !childAlt.chosenBackend.isEmpty()
     && !childAlt.chosenBackend.equals(agreedBackend)) {
Suggestion importance[1-10]: 7

__

Why: This is a valid logic bug: if the first child has a null or empty chosenBackend, agreedBackend remains null, and a subsequent child with a valid backend will not trigger the disagreement check but also won't set agreedBackend, potentially masking backend mismatches. The fix correctly applies symmetric null/empty checks when setting agreedBackend.

Medium
Guard against silent data loss for multi-input non-boundary nodes

After the multi-input boundary check was generalized to handle Union (all children
are ExchangeReducers), the fallback "attach on top" path still unconditionally uses
node.getInputs().getFirst(). If a node above the boundary has multiple inputs but
not all are ExchangeReducers, this silently discards all inputs except the first,
producing an incorrect plan. This should assert or validate that exactly one input
exists before calling getFirst().

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/FragmentConversionDriver.java [245-247]

 // Operator above the final-fragment boundary — convert child first, then attach.
-byte[] innerBytes = convertReduceNode(node.getInputs().getFirst(), convertor, false, delegationBytes);
+List<RelNode> inputs = node.getInputs();
+if (inputs.size() != 1) {
+    throw new IllegalStateException(
+        "Expected exactly one input for attach-on-top node " + node.getClass().getSimpleName() + ", got " + inputs.size()
+    );
+}
+byte[] innerBytes = convertReduceNode(inputs.getFirst(), convertor, false, delegationBytes);
 return convertor.attachFragmentOnTop(strippedNode, innerBytes);
Suggestion importance[1-10]: 6

__

Why: The fallback getFirst() call after the multi-input boundary check could silently discard inputs for nodes with multiple non-ER inputs, producing incorrect plans. Adding an explicit size assertion would catch this programming error early with a clear message.

Low
General
Avoid unnecessary reader creation on non-refresh cycles

The original guard if (didRefresh == false) return; was removed, meaning
afterRefresh now runs on every invocation regardless of whether a refresh actually
occurred. This could cause unnecessary DatafusionReader creation and readers.put
calls on every refresh cycle even when nothing changed, potentially causing
performance issues or resource leaks. Consider adding back a check for didRefresh
while still handling the empty-shard case separately (e.g., by checking if readers
is empty as a fallback).

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionReaderManager.java [93]

-// Register a reader even when no files were flushed (empty-shard case). Without
-// this, queries against an empty shard throw "No DataFusion reader available"
-// because refresh listeners are only invoked when didRefresh==true, but getReader
-// is called on every query regardless of whether any docs were ever indexed.
+if (didRefresh == false && readers.isEmpty() == false) return;
 if (readers.containsKey(catalogSnapshot)) return;
Suggestion importance[1-10]: 6

__

Why: The removal of if (didRefresh == false) return; means afterRefresh runs on every invocation, potentially causing unnecessary DatafusionReader creation. The suggestion to add back a conditional guard while still handling the empty-shard case is valid and addresses a real performance concern, though the readers.containsKey(catalogSnapshot) check already prevents duplicate creation for the same snapshot.

Low
Add logging for silently dropped in-flight responses

The catch block silently swallows IllegalStateException without any logging, making
it very difficult to diagnose issues where responses are unexpectedly dropped due to
reasons other than a legitimate teardown race. Additionally,
metrics.addRowsProcessed is skipped for the dropped batch, which may cause metrics
to be inaccurate. At minimum, a debug-level log should be added to aid diagnostics.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/stage/ShardFragmentStageExecution.java [119-127]

 } catch (IllegalStateException closed) {
     // QueryContext closed while this response was in flight — another
     // shard failed and tore down teardown. Silently drop; onShardTerminated
     // still needs to run so inFlight drains and the stage's terminal
     // transition isn't wedged.
+    logger.debug("Dropping in-flight response after sink closed", closed);
     if (isLast) {
         onShardTerminated();
     }
 }
Suggestion importance[1-10]: 4

__

Why: Adding a debug-level log for the silently swallowed IllegalStateException is a reasonable diagnostic improvement, but the existing comment already explains the intent and the change has low functional impact.

Low
Improve error message clarity for all-empty Union inputs

When all inputs are empty Values and are dropped, viableBackends remains null. The
markedInputs.isEmpty() check correctly throws, but the error message doesn't
indicate that all inputs were empty Values nodes, which would be the actual cause. A
more descriptive message would significantly aid debugging.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/rules/OpenSearchUnionRule.java [85-89]

 if (markedInputs.isEmpty()) {
     // Defensive — Calcite shouldn't construct a Union with all-empty inputs, but
     // surfacing a clear message beats letting downstream rules fail mysteriously.
-    throw new IllegalStateException("Union rule encountered Union with all-empty inputs");
+    throw new IllegalStateException(
+        "Union rule encountered Union with all-empty inputs (all " + union.getInputs().size()
+            + " inputs were empty Values nodes and were dropped)"
+    );
 }
Suggestion importance[1-10]: 3

__

Why: The improved error message adds useful context about why all inputs were dropped (they were empty Values nodes), but this is a minor diagnostic improvement with no functional impact.

Low
Cache single child stage ID alongside schema in constructor

The constructor already validates that childInputs.size() == 1 and caches the schema
as schemaIpc, but inputIdFor(singleChildStageId) re-iterates the map at close time.
Consider caching the single child stage ID in the constructor alongside schemaIpc to
avoid redundant map iteration and to make the relationship between the cached schema
and its stage ID explicit.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionMemtableReduceSink.java [93-94]

-int singleChildStageId = childInputs.keySet().iterator().next();
 NativeBridge.registerMemtable(session.getPointer(), inputIdFor(singleChildStageId), schemaIpc, arrayPtrs, schemaPtrs);
Suggestion importance[1-10]: 2

__

Why: The suggestion is a minor optimization to cache the stage ID in the constructor, but the improved_code is identical to the existing_code (it doesn't actually show the caching change), making the suggestion incomplete. The performance impact of a single map iteration is negligible.

Low

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

❌ Gradle check result for ad66195: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 6, 2026

Persistent review updated to latest commit be06fac

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 6, 2026

Persistent review updated to latest commit cc7e347

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

✅ Gradle check result for cc7e347: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 7, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.50%. Comparing base (dcb68f9) to head (cc7e347).
⚠️ Report is 2 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21480      +/-   ##
============================================
+ Coverage     73.44%   73.50%   +0.05%     
- Complexity    74426    74483      +57     
============================================
  Files          5970     5970              
  Lines        338267   338267              
  Branches      48753    48753              
============================================
+ Hits         248451   248636     +185     
+ Misses        70042    69826     -216     
- Partials      19774    19805      +31     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

Persistent review updated to latest commit 1ad9dc0

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

❌ Gradle check result for 1ad9dc0: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

Persistent review updated to latest commit b88d249

@mch2 mch2 marked this pull request as ready for review May 7, 2026 02:09
@mch2 mch2 requested a review from a team as a code owner May 7, 2026 02:09
@mch2 mch2 changed the title Draft - Add coordinator side hash joins to analytics engine Add coordinator side hash joins to analytics engine May 7, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

❌ Gradle check result for b88d249: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant