Skip to content

Add dynamic settings for indexed query execution path#21522

Open
alchemist51 wants to merge 8 commits intoopensearch-project:mainfrom
alchemist51:opensearch-dynamic
Open

Add dynamic settings for indexed query execution path#21522
alchemist51 wants to merge 8 commits intoopensearch-project:mainfrom
alchemist51:opensearch-dynamic

Conversation

@alchemist51
Copy link
Copy Markdown
Contributor

@alchemist51 alchemist51 commented May 6, 2026

Description

  • Introduces 7 dynamic cluster settings for tuning the DataFusion indexed query execution path
  • Settings are updatable at runtime via PUT _cluster/settings with no node restart
  • Pre-computes a WireConfigSnapshot (volatile field, rebuilt atomically on change) so the query hot path pays only a single volatile read — no ClusterService lookup per query
  • The snapshot is serialized into a 68-byte #[repr(C)] struct and passed to the Rust runtime via FFM
    [Describe what this change achieves]

Related Issues

Resolves #21536

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 6, 2026

PR Reviewer Guide 🔍

(Review updated until commit 636c8bf)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 Multiple PR themes

Sub-PR theme: Add WireConfigSnapshot immutable struct and FFM serialization

Relevant files:

  • sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/WireConfigSnapshot.java
  • sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/WireConfigSnapshotTests.java

Sub-PR theme: Add dynamic cluster settings for indexed query execution path

Relevant files:

  • sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionSettings.java
  • sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java
  • sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/DatafusionSettingsTests.java
  • sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/DatafusionSettingsPropertyTests.java
  • sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/DataFusionPluginSettingsTests.java
  • sandbox/plugins/analytics-backend-datafusion/src/internalClusterTest/java/org/opensearch/be/datafusion/DatafusionDynamicSettingsIT.java

Sub-PR theme: Wire query config snapshot through FFM to native session context creation

Relevant files:

  • sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/ShardScanInstructionHandler.java
  • sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/nativelib/NativeBridge.java
  • sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/nativelib/SessionContextConfig.java
  • sandbox/plugins/analytics-backend-datafusion/rust/src/ffm.rs

⚡ Recommended focus areas for review

Race Condition

The registerListeners method updates maxSliceCount and concurrentSearchMode as separate volatile writes before rebuilding the snapshot. If two settings change concurrently (e.g., both CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING and CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE fire at the same time), there is a window where this.maxSliceCount is updated but the snapshot still reflects the old concurrentSearchMode, or vice versa. The snapshot rebuild is not atomic with respect to both volatile fields together.

clusterSettings.addSettingsUpdateConsumer(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING, newValue -> {
    this.maxSliceCount = newValue;
    snapshot = WireConfigSnapshot.builder(snapshot)
        .targetPartitions(deriveTargetPartitions(this.concurrentSearchMode, newValue))
        .build();
});

clusterSettings.addSettingsUpdateConsumer(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE, newValue -> {
    this.concurrentSearchMode = newValue;
    snapshot = WireConfigSnapshot.builder(snapshot).targetPartitions(deriveTargetPartitions(newValue, this.maxSliceCount)).build();
});
Zero Partition Risk

In deriveTargetPartitions, when maxSliceCount == 0 the method returns Runtime.getRuntime().availableProcessors() / 2. On a single-core machine this evaluates to 0, which could cause downstream issues in DataFusion (zero target partitions). A Math.max(1, ...) guard should be applied.

if (maxSliceCount == 0) {
    return Runtime.getRuntime().availableProcessors() / 2;
}
Arena Lifetime

The Arena is closed immediately after createSessionContext returns, but the native side receives a raw pointer (segment.address()) to memory that was allocated from that arena. If the Rust side stores or dereferences this pointer after the arena is closed (i.e., after the try-with-resources block exits), this will result in a use-after-free. The arena must remain open for at least as long as the native code accesses the pointed-to memory.

try (Arena arena = Arena.ofConfined()) {
    MemorySegment segment = arena.allocate(WireConfigSnapshot.BYTE_SIZE);
    snapshot.writeTo(segment);
    SessionContextHandle sessionCtxHandle = NativeBridge.createSessionContext(
        readerPtr,
        runtimePtr,
        context.getTableName(),
        contextId,
        segment.address()
    );
    return new DataFusionSessionState(sessionCtxHandle);
}
Arena Leak

In createExecutionContext, an Arena is opened and then closed via arena.close() after createSessionContext. However, if NativeBridge.createSessionContext throws an exception, arena.close() is never called, leaking off-heap memory. The arena should be managed with try-with-resources or a try-finally block.

Arena arena = Arena.ofConfined();
MemorySegment configSegment = arena.allocate(WireConfigSnapshot.BYTE_SIZE);
WireConfigSnapshot.builder().build().writeTo(configSegment);
SessionContextHandle sessionCtxHandle = NativeBridge.createSessionContext(
    readerHandle.getPointer(),
    runtimeHandle.get(),
    tableName,
    0L,
    configSegment.address()
);
arena.close();
Hardcoded Values

Several fields written in writeTo are hardcoded constants (indexed_pushdown_filters=1, force_strategy=-1, force_pushdown=-1, cost_predicate=1, cost_collector=10). These are not exposed as settings and cannot be tuned at runtime. If these values ever need to change, a code change and redeployment is required. Consider whether any of these should also be dynamic settings, or at minimum document clearly why they are intentionally fixed.

segment.set(ValueLayout.JAVA_INT, 36, 1);
// Offset 40: force_strategy (i32) — always -1 (None)
segment.set(ValueLayout.JAVA_INT, 40, -1);
// Offset 44: force_pushdown (i32) — always -1 (None)
segment.set(ValueLayout.JAVA_INT, 44, -1);
// Offset 48: cost_predicate (i32) — hardcoded 1
segment.set(ValueLayout.JAVA_INT, 48, 1);
// Offset 52: cost_collector (i32) — hardcoded 10
segment.set(ValueLayout.JAVA_INT, 52, 10);
// Offset 56: max_collector_parallelism (i32)

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 6, 2026

PR Code Suggestions ✨

Latest suggestions up to 636c8bf

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Accidental removal of UDF registration

The removal of crate::udf::register_all(&ctx) means UDFs (including the
index_filter(...) UDF marker referenced in the codebase comments) will no longer be
registered in the session context. This will cause queries that rely on these UDFs
to fail at runtime. Verify this removal is intentional and that UDF registration is
handled elsewhere before merging.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [102-103]

     let ctx = SessionContext::new_with_state(state);
--   crate::udf::register_all(&ctx);
+    crate::udf::register_all(&ctx);
Suggestion importance[1-10]: 7

__

Why: The removal of crate::udf::register_all(&ctx) could break queries relying on UDFs like index_filter(...). However, this may be intentional if UDF registration was moved elsewhere, so the score is moderate rather than critical.

Medium
Potential use-after-free with native pointer

The Arena is closed (and its memory freed) immediately after createSessionContext
returns, but the native code receives a raw pointer (segment.address()) to that
memory. If the native df_create_session_context stores or dereferences the pointer
after the Java try block exits, this is a use-after-free. Ensure the native function
only reads the config synchronously within the call, or extend the arena's lifetime
to cover the native call's full use of the pointer.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/ShardScanInstructionHandler.java [58-70]

     WireConfigSnapshot snapshot = plugin.getDatafusionSettings().getSnapshot();
     try (Arena arena = Arena.ofConfined()) {
         MemorySegment segment = arena.allocate(WireConfigSnapshot.BYTE_SIZE);
         snapshot.writeTo(segment);
         SessionContextHandle sessionCtxHandle = NativeBridge.createSessionContext(
             readerPtr,
             runtimePtr,
             context.getTableName(),
             contextId,
             segment.address()
         );
+        // Arena must remain open until native call fully completes and no longer references the pointer
         return new DataFusionSessionState(sessionCtxHandle);
     }
Suggestion importance[1-10]: 7

__

Why: The Arena is closed immediately after createSessionContext returns, but if the native code stores the pointer for later use, this would be a use-after-free. The improved_code is essentially the same as existing_code with just a comment added, so the suggestion is more of a verification concern. The native df_create_session_context appears to decode the config synchronously (calling from_ffm_ptr which reads immediately), making this likely safe, but it warrants verification.

Medium
Zero target partitions on single-CPU systems

When availableProcessors() returns 1 (e.g., in constrained environments), integer
division by 2 yields 0, which would set target_partitions to 0. This could cause
downstream issues in DataFusion where 0 partitions is invalid. Add a Math.max(1,
...) guard to ensure at least 1 partition is always returned.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionSettings.java [339-341]

     if (maxSliceCount == 0) {
-        return Runtime.getRuntime().availableProcessors() / 2;
+        return Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
     }
Suggestion importance[1-10]: 7

__

Why: On single-CPU systems, availableProcessors() / 2 yields 0, which could cause DataFusion to fail with 0 partitions. Adding Math.max(1, ...) is a simple and correct fix to prevent this edge case.

Medium
Null pointer causes node crash instead of graceful fallback

Changing from a graceful fallback to a hard assert! (panic) on null pointer means
any Java-side bug that passes 0 will crash the entire node process rather than
returning an error. Consider returning a Result or using the fallback defaults and
logging an error, so a misconfigured call degrades gracefully instead of taking down
the node.

sandbox/plugins/analytics-backend-datafusion/rust/src/datafusion_query_config.rs [123-130]

     pub unsafe fn from_ffm_ptr(ptr: i64) -> Self {
-        assert!(
-            ptr != 0,
-            "from_ffm_ptr: null query config pointer — Java must always provide a valid config"
-        );
+        if ptr == 0 {
+            log::error!("from_ffm_ptr: null query config pointer — using fallback defaults");
+            return Self::fallback();
+        }
         let wire = &*(ptr as *const WireDatafusionQueryConfig);
         Self::from_wire(wire)
     }
Suggestion importance[1-10]: 6

__

Why: Changing from a graceful fallback to a hard assert! (panic) on null pointer means any Java-side bug passing 0 will crash the node process. Using fallback() with an error log would be safer for production resilience, though the PR intentionally enforces that Java always provides a valid pointer.

Low
Prevent premature drop of pool resource

The _pool variable is immediately dropped after DynamicLimitPool::new(0), which
means the pool itself is deallocated while handle still references it. If
DynamicLimitHandle holds a weak or raw reference back to the pool, this could cause
undefined behavior or panics at runtime. The pool should be kept alive for the
lifetime of the DataFusionRuntime, either by storing it in the struct or using an
Arc-based design.

sandbox/plugins/analytics-backend-datafusion/rust/src/api.rs [121-128]

 pub fn new_for_bench(runtime_env: datafusion::execution::runtime_env::RuntimeEnv) -> Self {
     let (_pool, handle) = DynamicLimitPool::new(0);
+    // Ensure _pool lives as long as the runtime if DynamicLimitHandle holds a reference to it.
+    // Consider storing _pool in the struct or using Arc to keep it alive.
     Self {
         runtime_env,
         custom_cache_manager: None,
         dynamic_limit_handle: handle,
     }
 }
Suggestion importance[1-10]: 3

__

Why: The concern about _pool being dropped is potentially valid, but the improved_code is identical to the existing_code (just adds a comment), making it not actionable. The actual fix would require storing _pool in the struct, which isn't demonstrated. Additionally, DynamicLimitPool::new(0) likely uses Arc internally, making this a non-issue in practice.

Low
General
Add assertion to validate partition count

The target_partitions field is now read directly from query_config inside
execute_indexed_query, but the caller in api.rs previously applied .max(1) before
passing it. While .max(1) is preserved here, if target_partitions is of a signed
integer type and could be zero or negative, the .max(1) call is correct. However, if
it's an unsigned type (e.g., usize), calling .max(1) on 0usize is fine, but it's
worth ensuring the type semantics are consistent with the builder's validation to
avoid silent misconfiguration.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_executor.rs [93]

 let num_partitions = query_config.target_partitions.max(1);
+debug_assert!(num_partitions >= 1, "target_partitions must be at least 1");
Suggestion importance[1-10]: 2

__

Why: Adding a debug_assert! after .max(1) is redundant since .max(1) already guarantees num_partitions >= 1. The suggestion provides minimal value and the assertion would always pass.

Low

Previous suggestions

Suggestions up to commit e789a8c
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix race condition in concurrent settings updates

The two listeners for maxSliceCount and concurrentSearchMode each read the other's
volatile field and then write snapshot, creating a race condition. If both settings
are updated concurrently, one listener may read a stale value of the other field
before the other listener has updated it, resulting in an incorrect targetPartitions
in the snapshot. Consider using synchronized blocks or an AtomicReference with a
compare-and-swap loop to ensure both fields are read and the snapshot is rebuilt
atomically.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionSettings.java [284-294]

     clusterSettings.addSettingsUpdateConsumer(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING, newValue -> {
-        this.maxSliceCount = newValue;
-        snapshot = WireConfigSnapshot.builder(snapshot)
-            .targetPartitions(deriveTargetPartitions(this.concurrentSearchMode, newValue))
-            .build();
+        synchronized (this) {
+            this.maxSliceCount = newValue;
+            snapshot = WireConfigSnapshot.builder(snapshot)
+                .targetPartitions(deriveTargetPartitions(this.concurrentSearchMode, newValue))
+                .build();
+        }
     });
 
     clusterSettings.addSettingsUpdateConsumer(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE, newValue -> {
-        this.concurrentSearchMode = newValue;
-        snapshot = WireConfigSnapshot.builder(snapshot).targetPartitions(deriveTargetPartitions(newValue, this.maxSliceCount)).build();
+        synchronized (this) {
+            this.concurrentSearchMode = newValue;
+            snapshot = WireConfigSnapshot.builder(snapshot).targetPartitions(deriveTargetPartitions(newValue, this.maxSliceCount)).build();
+        }
     });
Suggestion importance[1-10]: 7

__

Why: The race condition between maxSliceCount and concurrentSearchMode listeners is a real concurrency issue — each listener reads the other's volatile field and writes snapshot non-atomically. However, in practice OpenSearch's cluster settings updates are typically serialized, so the impact may be limited. The synchronized fix is correct and addresses a genuine bug.

Medium
Guard against zero target partitions on single-core hosts

When availableProcessors() returns 1 (single-core environment), integer division by
2 yields 0, which would set target_partitions to 0. This could cause downstream
issues in DataFusion where 0 partitions is invalid. Add a Math.max(1, ...) guard to
ensure at least 1 partition is always returned.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionSettings.java [339-341]

     if (maxSliceCount == 0) {
-        return Runtime.getRuntime().availableProcessors() / 2;
+        return Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
     }
Suggestion importance[1-10]: 6

__

Why: On a single-core host, availableProcessors() / 2 returns 0, which could cause DataFusion to receive an invalid target_partitions value of 0. The Math.max(1, ...) guard is a simple and correct fix for this edge case.

Low
Prevent premature drop of pool handle

The _pool returned by DynamicLimitPool::new(0) is immediately dropped, which may
cause the dynamic limit pool to be deallocated while handle still references it. If
DynamicLimitPool uses weak references or shared state, dropping the pool could lead
to unexpected behavior or panics. Consider storing the pool in the struct or using a
sentinel value that keeps it alive.

sandbox/plugins/analytics-backend-datafusion/rust/src/api.rs [122]

-let (_pool, handle) = DynamicLimitPool::new(0);
+let (pool, handle) = DynamicLimitPool::new(0);
+let _ = std::mem::ManuallyDrop::new(pool); // or store pool in the struct
Suggestion importance[1-10]: 4

__

Why: The concern about _pool being dropped is valid in principle, but the improved_code uses ManuallyDrop which is not a proper solution and could cause memory leaks. The actual fix would depend on DynamicLimitPool's internal design. The suggestion raises a legitimate concern but the proposed fix is not ideal.

Low
General
Verify intentional removal of UDF registration

The removal of crate::udf::register_all(&ctx) means UDFs (including the
index_filter(...) UDF marker referenced in the IndexedTableConfig comment) are no
longer registered in the session context. If any query relies on these UDFs being
present, it will fail at runtime. Confirm this removal is intentional and that all
callers have been updated accordingly.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [102-103]

     let ctx = SessionContext::new_with_state(state);
--    crate::udf::register_all(&ctx);
+    // Ensure UDFs are registered if any query path requires them.
+    // crate::udf::register_all(&ctx);
Suggestion importance[1-10]: 4

__

Why: The removal of crate::udf::register_all(&ctx) could break queries relying on UDFs, but the improved_code just comments out the line rather than restoring it, making it essentially a verification request. The concern is valid but the suggestion doesn't provide a definitive fix.

Low
Document memory lifetime assumption for native pointer usage

The Arena is closed at the end of the try-with-resources block, which frees the
MemorySegment. However, segment.address() passes the raw pointer to the native side.
If the native createSessionContext call reads the pointer asynchronously or stores
it beyond the call duration, the memory will have been freed. Verify that the native
function fully consumes the config pointer synchronously before the arena is closed;
if not, the arena must remain open until the native call completes, which it does
here only if the call is fully synchronous within NativeBridge.createSessionContext.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/ShardScanInstructionHandler.java [58-70]

     WireConfigSnapshot snapshot = plugin.getDatafusionSettings().getSnapshot();
     try (Arena arena = Arena.ofConfined()) {
         MemorySegment segment = arena.allocate(WireConfigSnapshot.BYTE_SIZE);
         snapshot.writeTo(segment);
+        // NOTE: createSessionContext must fully consume queryConfigPtr before returning,
+        // as the Arena (and thus the segment) is closed immediately after this call.
         SessionContextHandle sessionCtxHandle = NativeBridge.createSessionContext(
             readerPtr,
             runtimePtr,
             context.getTableName(),
             contextId,
             segment.address()
         );
         return new DataFusionSessionState(sessionCtxHandle);
     }
Suggestion importance[1-10]: 3

__

Why: The suggestion only asks to verify/document an assumption rather than fix actual code. The improved_code is essentially the same as existing_code with just a comment added, which doesn't address a real bug — it's just a documentation concern.

Low
Ensure safe type conversion for partition count

If query_config.target_partitions is 0, max(1) correctly clamps it to 1. However, if
the field is a signed integer type and could be negative, max(1) would still work,
but it's worth ensuring the type is unsigned or adding an explicit check. More
importantly, verify that query_config is not cloned/moved before this line if it's
used later, since Arc is passed by value.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_executor.rs [93]

-let num_partitions = query_config.target_partitions.max(1);
+let num_partitions = query_config.target_partitions.max(1) as usize;
Suggestion importance[1-10]: 2

__

Why: The suggestion to add as usize cast is minor and speculative — if target_partitions is already usize, the cast is redundant. The suggestion asks to verify types rather than addressing a confirmed issue, making it low impact.

Low
Suggestions up to commit 9ecab74
CategorySuggestion                                                                                                                                    Impact
Possible issue
Guard against zero target partitions on single-core systems

When availableProcessors() returns 1, dividing by 2 yields 0, which would set
target_partitions to 0. This could cause downstream issues in DataFusion if it
expects at least 1 partition. Add a Math.max(1, ...) guard to ensure the result is
always at least 1.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionSettings.java [339-341]

 if (maxSliceCount == 0) {
-    return Runtime.getRuntime().availableProcessors() / 2;
+    return Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
 }
Suggestion importance[1-10]: 7

__

Why: On a single-core system, availableProcessors() / 2 returns 0, which would set target_partitions to 0 and could cause downstream failures in DataFusion. The Math.max(1, ...) guard is a simple and correct fix for a real edge case.

Medium
Pass dynamic settings snapshot to native session context

The queryConfigPtr is hardcoded to 0L (fallback defaults), meaning the dynamic
settings managed by DatafusionSettings are never actually passed to the native
layer. The DatafusionSettings instance should be used here to write the current
snapshot into an Arena-allocated MemorySegment and pass its address as
queryConfigPtr.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/ShardScanInstructionHandler.java [55-61]

-SessionContextHandle sessionCtxHandle = NativeBridge.createSessionContext(
-    readerPtr,
-    runtimePtr,
-    context.getTableName(),
-    contextId,
-    0L
-);
+WireConfigSnapshot snapshot = dataFusionService.getDatafusionSettings().getSnapshot();
+try (Arena arena = Arena.ofConfined()) {
+    MemorySegment configSegment = arena.allocate(WireConfigSnapshot.BYTE_SIZE);
+    snapshot.writeTo(configSegment);
+    SessionContextHandle sessionCtxHandle = NativeBridge.createSessionContext(
+        readerPtr,
+        runtimePtr,
+        context.getTableName(),
+        contextId,
+        configSegment.address()
+    );
+    return new DataFusionSessionState(sessionCtxHandle);
+}
Suggestion importance[1-10]: 7

__

Why: The queryConfigPtr is hardcoded to 0L, meaning all the dynamic settings infrastructure added in this PR (DatafusionSettings, WireConfigSnapshot) is never actually used on the query hot path. This is a significant functional gap, though the Arena lifetime management in the improved code needs careful consideration since the Arena would be closed before the native call completes.

Medium
Builder method cfg-gating may cause compilation failures

The builder() method is gated with #[cfg(test)], but it is used in non-test code in
several files (e.g., multi_segment.rs, streaming_at_scale.rs, schema_drift.rs,
null_columns.rs, session_context.rs). If those call sites are in #[cfg(test)]
modules themselves this may compile, but the builder and its methods should be
consistently scoped. Verify all call sites are within test-only code, or remove the
#[cfg(test)] gate from builder() and DatafusionQueryConfigBuilder.

sandbox/plugins/analytics-backend-datafusion/rust/src/datafusion_query_config.rs [110-113]

-    #[cfg(test)]
     pub fn builder() -> DatafusionQueryConfigBuilder {
         DatafusionQueryConfigBuilder::new()
     }
Suggestion importance[1-10]: 7

__

Why: The builder() method and DatafusionQueryConfigBuilder are gated with #[cfg(test)], but they are used in multiple test files. If all call sites are within #[cfg(test)] modules, this compiles fine, but the suggestion raises a valid concern about consistency. The PR shows the builder is used in test-only contexts, so this is a moderate concern worth verifying.

Medium
Fix race condition in concurrent settings update listeners

The two listeners for maxSliceCount and concurrentSearchMode each read the other's
volatile field and then write snapshot, creating a check-then-act race condition. If
both settings are updated concurrently, one update could overwrite the other's
snapshot using a stale value of the sibling field. Consider using synchronized
blocks or an atomic compare-and-swap approach to make the read-modify-write of both
fields and the snapshot atomic.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionSettings.java [284-294]

 clusterSettings.addSettingsUpdateConsumer(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING, newValue -> {
-    this.maxSliceCount = newValue;
-    snapshot = WireConfigSnapshot.builder(snapshot)
-        .targetPartitions(deriveTargetPartitions(this.concurrentSearchMode, newValue))
-        .build();
+    synchronized (this) {
+        this.maxSliceCount = newValue;
+        snapshot = WireConfigSnapshot.builder(snapshot)
+            .targetPartitions(deriveTargetPartitions(this.concurrentSearchMode, newValue))
+            .build();
+    }
 });
 
 clusterSettings.addSettingsUpdateConsumer(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE, newValue -> {
-    this.concurrentSearchMode = newValue;
-    snapshot = WireConfigSnapshot.builder(snapshot).targetPartitions(deriveTargetPartitions(newValue, this.maxSliceCount)).build();
+    synchronized (this) {
+        this.concurrentSearchMode = newValue;
+        snapshot = WireConfigSnapshot.builder(snapshot)
+            .targetPartitions(deriveTargetPartitions(newValue, this.maxSliceCount))
+            .build();
+    }
 });
Suggestion importance[1-10]: 6

__

Why: The two listeners for maxSliceCount and concurrentSearchMode have a read-modify-write race: each reads the other's volatile field and writes snapshot non-atomically, so concurrent updates could produce a snapshot with a stale sibling value. Using synchronized blocks as suggested would eliminate this race condition.

Low
Null pointer now panics instead of using safe fallback

Changing from_ffm_ptr to panic on null pointer (instead of returning defaults) is a
breaking behavioral change. If any existing Java caller passes 0 during a
transitional period (as the doc comment acknowledges), this will cause a hard crash
in production rather than a graceful fallback. Consider returning Self::fallback()
on null during the transition, or at minimum use a non-panicking error return path.

sandbox/plugins/analytics-backend-datafusion/rust/src/datafusion_query_config.rs [124-127]

-    assert!(
-        ptr != 0,
-        "from_ffm_ptr: null query config pointer — Java must always provide a valid config"
-    );
+    if ptr == 0 {
+        return Self::fallback();
+    }
Suggestion importance[1-10]: 6

__

Why: Changing from a graceful fallback to a hard panic on null pointer is a breaking behavioral change that could crash production if Java passes 0 during a transitional period. The doc comment itself acknowledges a transitional period, making this a real risk worth addressing.

Low
UDF registration removal may break queries at runtime

The call to crate::udf::register_all(&ctx) was removed from execute_indexed_query.
If this function registers UDFs (like convert_tz) that are needed by indexed
queries, removing it could cause query failures at runtime when those UDFs are
referenced in plans. Verify that all required UDFs are still registered through
another path before finalizing this removal.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_executor.rs [135]

     ctx.register_udf(create_index_filter_udf());
--   crate::udf::register_all(&ctx);
+    crate::udf::register_all(&ctx);
Suggestion importance[1-10]: 5

__

Why: The removal of crate::udf::register_all(&ctx) from execute_indexed_query could cause runtime failures if indexed queries reference UDFs registered by that call. However, the suggestion asks to verify rather than confirming a definite bug, and the same removal was done in session_context.rs, suggesting it may be intentional.

Low
General
Success message logged at wrong severity level

A successful registration is logged at the error! level, which is semantically
incorrect and will pollute error logs with non-error events. This should use info!
or debug! instead.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [137-141]

-    error!(
+    info!(
         "create_session_context: successfully registered table '{}', table_name_len={}",
         table_name,
         table_name.len()
     );
Suggestion importance[1-10]: 6

__

Why: Using error! to log a successful table registration is semantically incorrect and will pollute error logs. This should use info! or debug! instead, which is a clear and impactful correctness issue for observability.

Low
Suggestions up to commit 252a1fc
CategorySuggestion                                                                                                                                    Impact
Possible issue
Null config pointer causes panic in FFM entry point

from_ffm_ptr now panics on a null pointer, but ShardScanInstructionHandler passes 0L
as queryConfigPtr. This will cause a panic at runtime for every non-indexed query
that goes through df_create_session_context. The FFM entry point should handle the
null case gracefully (e.g., fall back to DatafusionQueryConfig::fallback()) rather
than panicking, or the Java call site must always supply a valid pointer.

sandbox/plugins/analytics-backend-datafusion/rust/src/ffm.rs [467-468]

-let query_config =
-    crate::datafusion_query_config::DatafusionQueryConfig::from_ffm_ptr(query_config_ptr);
+let query_config = if query_config_ptr == 0 {
+    crate::datafusion_query_config::DatafusionQueryConfig::test_default()
+} else {
+    unsafe { crate::datafusion_query_config::DatafusionQueryConfig::from_ffm_ptr(query_config_ptr) }
+};
Suggestion importance[1-10]: 8

__

Why: from_ffm_ptr now panics on null (0), but ShardScanInstructionHandler passes 0L as queryConfigPtr. This creates a real runtime panic for every call through df_create_session_context with a null config pointer. The fix to handle null gracefully is accurate and important.

Medium
Guard against zero target partitions on single-core hosts

When availableProcessors() returns 1 (e.g., in a single-core CI environment),
availableProcessors() / 2 evaluates to 0, which would set target_partitions to 0.
The Rust side does query_config.target_partitions.max(1) in some places but not all,
so passing 0 could cause unexpected behavior. Add a Math.max(1, ...) guard here.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionSettings.java [339-341]

-private static int deriveTargetPartitions(String mode, int maxSliceCount) {
-    if (SearchService.CONCURRENT_SEGMENT_SEARCH_MODE_NONE.equals(mode)) {
-        return 1;
-    }
-
-    // For maxSliceCount == 0 also, we will be owning the concurrency level
-    if (maxSliceCount == 0) {
-        return Runtime.getRuntime().availableProcessors() / 2;
-    }
-    ...
+if (maxSliceCount == 0) {
+    return Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
 }
Suggestion importance[1-10]: 6

__

Why: On single-core hosts, availableProcessors() / 2 returns 0, which could cause issues in Rust code that doesn't always apply .max(1). The Math.max(1, ...) guard is a valid defensive fix, though the Rust side does apply .max(1) in several places.

Low
Race condition in concurrent settings update listeners

The two listeners that update targetPartitions each read the other's volatile field
and then write snapshot — a classic check-then-act race. If both settings are
updated concurrently, one listener may read a stale value of concurrentSearchMode or
maxSliceCount and produce an incorrect snapshot. Consider synchronizing these two
listeners or combining them into a single addSettingsUpdateConsumer call that takes
both settings atomically.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionSettings.java [284-294]

 clusterSettings.addSettingsUpdateConsumer(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING, newValue -> {
-    this.maxSliceCount = newValue;
-    snapshot = WireConfigSnapshot.builder(snapshot)
-        .targetPartitions(deriveTargetPartitions(this.concurrentSearchMode, newValue))
-        .build();
+    synchronized (this) {
+        this.maxSliceCount = newValue;
+        snapshot = WireConfigSnapshot.builder(snapshot)
+            .targetPartitions(deriveTargetPartitions(this.concurrentSearchMode, newValue))
+            .build();
+    }
 });
 
 clusterSettings.addSettingsUpdateConsumer(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE, newValue -> {
-    this.concurrentSearchMode = newValue;
-    snapshot = WireConfigSnapshot.builder(snapshot).targetPartitions(deriveTargetPartitions(newValue, this.maxSliceCount)).build();
+    synchronized (this) {
+        this.concurrentSearchMode = newValue;
+        snapshot = WireConfigSnapshot.builder(snapshot)
+            .targetPartitions(deriveTargetPartitions(newValue, this.maxSliceCount))
+            .build();
+    }
 });
Suggestion importance[1-10]: 5

__

Why: There is a theoretical race between the two listeners updating concurrentSearchMode/maxSliceCount and snapshot, but in practice OpenSearch's cluster settings updates are serialized. The suggestion is technically valid but the risk is low in the actual execution environment.

Low
General
Verify no duplicate listener registration for memory pool setting

DatafusionSettings registers its own addSettingsUpdateConsumer for the indexed
settings, but DataFusionPlugin also registers a separate consumer for
DATAFUSION_MEMORY_POOL_LIMIT earlier in createComponents. The memory pool limit
setting is included in DatafusionSettings.ALL_SETTINGS (via
DataFusionPlugin.DATAFUSION_MEMORY_POOL_LIMIT), but DatafusionSettings does not
register a listener for it. This means the existing updateMemoryPoolLimit listener
registered before this line is the only handler — verify that DatafusionSettings
does not accidentally shadow or duplicate this registration, and that
DATAFUSION_MEMORY_POOL_LIMIT is not expected to be handled inside
DatafusionSettings.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [144]

+// Ensure DatafusionSettings does not register a duplicate listener for DATAFUSION_MEMORY_POOL_LIMIT.
+// The memory pool update is handled separately above via updateMemoryPoolLimit.
 this.datafusionSettings = new DatafusionSettings(clusterService);
Suggestion importance[1-10]: 2

__

Why: This is a verification suggestion asking to confirm no duplicate listener exists. The DatafusionSettings class does not register a listener for DATAFUSION_MEMORY_POOL_LIMIT (only for indexed settings), so there is no actual duplication issue to fix. The improved code just adds a comment.

Low
Suggestions up to commit 83c7e3b
Suggestions up to commit 0b4f76a
CategorySuggestion                                                                                                                                    Impact
Possible issue
Protect snapshot updates from race conditions

The registerListeners method performs non-atomic read-modify-write operations on the
volatile snapshot field. Between reading the current snapshot and writing the new
one, another thread could update it, causing lost updates. Use a synchronized block
or atomic compare-and-set pattern to ensure atomicity of snapshot updates.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionSettings.java [255-282]

 void registerListeners(ClusterSettings clusterSettings) {
     clusterSettings.addSettingsUpdateConsumer(INDEXED_BATCH_SIZE, newValue -> {
-        snapshot = WireConfigSnapshot.builder(snapshot).batchSize(newValue).build();
+        synchronized (this) {
+            snapshot = WireConfigSnapshot.builder(snapshot).batchSize(newValue).build();
+        }
     });
 
     clusterSettings.addSettingsUpdateConsumer(INDEXED_PARQUET_PUSHDOWN_FILTERS, newValue -> {
-        snapshot = WireConfigSnapshot.builder(snapshot).parquetPushdownFilters(newValue).build();
+        synchronized (this) {
+            snapshot = WireConfigSnapshot.builder(snapshot).parquetPushdownFilters(newValue).build();
+        }
     });
-    ...
+    // ... apply same pattern to all other listeners
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential race condition in the registerListeners method where multiple threads could perform non-atomic read-modify-write operations on the volatile snapshot field. However, the fix using synchronized(this) may not be the optimal approach since ClusterSettings callbacks are typically invoked serially by the framework. The concern is valid but the severity is moderate since the framework likely provides ordering guarantees. The suggestion is accurate but could benefit from verifying framework behavior.

Medium
Validate pointer before unsafe dereference

The from_ffm_ptr method dereferences a raw pointer without validation. If
query_config_ptr is null or invalid, this will cause undefined behavior or a
segmentation fault. Add pointer validation before dereferencing or ensure the caller
guarantees a valid pointer with documentation.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [90]

+if query_config_ptr == 0 {
+    return Err(DataFusionError::Execution("Invalid query_config_ptr: null pointer".to_string()));
+}
 let query_config = crate::datafusion_query_config::DatafusionQueryConfig::from_ffm_ptr(query_config_ptr);
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a safety issue in unsafe Rust code where a raw pointer is dereferenced without validation. Adding a null pointer check before dereferencing is a valid defensive practice. However, the Java side (NativeBridge.createSessionContext) already validates and allocates the memory segment, so the pointer should be guaranteed valid. The suggestion is technically sound but may be redundant given the caller's responsibility to provide valid pointers.

Medium
Add null check for uninitialized settings

The code calls plugin.getDatafusionSettings() which may return null if
createComponents() has not been called yet. This will cause a NullPointerException
at runtime. Add a null check or ensure the settings are initialized before this code
path is reached.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/ShardScanInstructionHandler.java [56-64]

+DatafusionSettings datafusionSettings = plugin.getDatafusionSettings();
+if (datafusionSettings == null) {
+    throw new IllegalStateException("DatafusionSettings not initialized. Ensure DataFusionPlugin.createComponents() has been called.");
+}
 SessionContextConfig config = new SessionContextConfig(
     readerPtr,
     runtimePtr,
     context.getTableName(),
     contextId,
-    plugin.getDatafusionSettings().getSnapshot()
+    datafusionSettings.getSnapshot()
 );
 SessionContextHandle sessionCtxHandle = NativeBridge.createSessionContext(config);
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies a potential null pointer risk when plugin.getDatafusionSettings() is called before createComponents() initializes the field. The proposed null check with a descriptive error message is reasonable defensive programming. However, the severity is moderate since the plugin lifecycle should ensure createComponents() is called before query execution, making this a defensive measure rather than a critical bug fix.

Low
General
Handle initialization failures gracefully

The constructor calls registerListeners() which registers dynamic update consumers.
If an exception occurs during snapshot building but before registerListeners()
completes, the object may be left in an inconsistent state. Move listener
registration before snapshot initialization or wrap the entire initialization in a
try-catch to ensure consistency.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionSettings.java [214-233]

 public DatafusionSettings(ClusterService clusterService) {
     Settings settings = clusterService.getSettings();
     ClusterSettings clusterSettings = clusterService.getClusterSettings();
 
     this.concurrentSearchMode = SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE.get(settings);
     this.maxSliceCount = SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.get(settings);
 
-    this.snapshot = WireConfigSnapshot.builder()
-        ...
-        .build();
+    try {
+        this.snapshot = WireConfigSnapshot.builder()
+            ...
+            .build();
 
-    registerListeners(clusterSettings);
+        registerListeners(clusterSettings);
+    } catch (Exception e) {
+        throw new RuntimeException("Failed to initialize DatafusionSettings", e);
+    }
 }
Suggestion importance[1-10]: 3

__

Why: While the suggestion addresses error handling, wrapping the entire initialization in a try-catch that rethrows as RuntimeException provides minimal value. The existing code already allows exceptions to propagate naturally, which is appropriate for constructor failures. The suggested improvement doesn't meaningfully enhance error handling or recovery, making this a low-impact suggestion.

Low

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 6, 2026

Persistent review updated to latest commit f8165e8

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 6, 2026

Persistent review updated to latest commit 6a6d8b3

@alchemist51 alchemist51 changed the title Add dynamic settings for datafusion request Add dynamic settings for datafusion queries May 6, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 6, 2026

✅ Gradle check result for 6a6d8b3: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 6, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.41%. Comparing base (144386c) to head (6a6d8b3).
⚠️ Report is 6 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21522      +/-   ##
============================================
+ Coverage     73.38%   73.41%   +0.03%     
- Complexity    74380    74429      +49     
============================================
  Files          5970     5970              
  Lines        338267   338267              
  Branches      48753    48753              
============================================
+ Hits         248228   248346     +118     
+ Misses        70237    70172      -65     
+ Partials      19802    19749      -53     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

batch_size: w.batch_size as usize,
target_partitions: w.target_partitions as usize,
// 0 means "let the runtime decide" — use available CPU parallelism
target_partitions: if w.target_partitions == 0 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if we should do this ? this should always come as a defined value from java

@alchemist51 alchemist51 force-pushed the opensearch-dynamic branch from 418edc4 to c65850f Compare May 7, 2026 05:37
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

Persistent review updated to latest commit c65850f

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

Persistent review updated to latest commit 9398a2c

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

❌ Gradle check result for 9398a2c: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@alchemist51 alchemist51 requested a review from bharath-techie May 7, 2026 09:26
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

Persistent review updated to latest commit 5b1615b

@github-actions github-actions Bot added enhancement Enhancement or improvement to existing feature or request Search Search query, autocomplete ...etc labels May 7, 2026
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
@alchemist51 alchemist51 force-pushed the opensearch-dynamic branch from 5b1615b to 0b4f76a Compare May 7, 2026 09:34
@alchemist51 alchemist51 marked this pull request as ready for review May 7, 2026 09:34
@alchemist51 alchemist51 requested a review from a team as a code owner May 7, 2026 09:34
@alchemist51 alchemist51 changed the title Add dynamic settings for datafusion queries Add dynamic settings for indexed query execution path May 7, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

Persistent review updated to latest commit 0b4f76a

@alchemist51 alchemist51 self-assigned this May 7, 2026
})?;

let query_config = crate::datafusion_query_config::DatafusionQueryConfig::default();
let query_config = crate::datafusion_query_config::DatafusionQueryConfig::from_ffm_ptr(query_config_ptr);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check everywhere and replace? [ tests are okay ]

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

Persistent review updated to latest commit 83c7e3b

.build()
.unwrap();
let df_runtime = DataFusionRuntime { runtime_env };
let df_runtime = DataFusionRuntime::new_for_bench(runtime_env);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #21538 for this.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

Persistent review updated to latest commit 252a1fc

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 636c8bf.

PathLineSeverityDescription
sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs101lowRemoval of `crate::udf::register_all(&ctx)` without explanation. If any registered UDFs enforced query-level security constraints or access control, removing their registration silently degrades those controls. The PR description focuses on config wiring, making this deletion an unexplained side-effect worth verifying.
sandbox/plugins/analytics-backend-datafusion/rust/src/datafusion_query_config.rs107low`from_ffm_ptr` now panics (asserts ptr != 0) instead of returning safe defaults on a null pointer. This is a behavioral regression that turns a previously recoverable condition into a process-terminating crash. While explicitly documented, it expands the crash surface if Java ever passes 0 for any reason, enabling a potential availability impact.

The table above displays the top 10 most important findings.

Total: 2 | Critical: 0 | High: 0 | Medium: 0 | Low: 2


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

Persistent review updated to latest commit 9ecab74

Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
@alchemist51 alchemist51 force-pushed the opensearch-dynamic branch from 7e9e6c2 to e789a8c Compare May 7, 2026 12:52
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

Persistent review updated to latest commit e789a8c

Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

Persistent review updated to latest commit 636c8bf

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement Enhancement or improvement to existing feature or request Search Search query, autocomplete ...etc

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add dynamic cluster settings for DataFusion indexed query execution path

2 participants