-
Notifications
You must be signed in to change notification settings - Fork 4k
GH-34252: [Java] Support ScannerBuilder::Project or ScannerBuilder::Filter as a Substrait proto extended expression #35570
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 79 commits
a0aac46
0d91f09
0599dc2
c794ae5
8cc5443
e5594f8
223ddef
795e619
3bd18f1
088a101
ba23e44
8655815
d22d6b1
f0d8a25
632f90d
9437f4e
61d6ee7
64c7607
721fe01
b3c2e1e
f5596c9
388446b
ead80a8
0446453
8c57c16
766b383
5e8b887
7f59fbd
0d2bcf8
4380932
9bbe4fb
5351ee1
e966d32
cfe4061
c7003a1
2419896
8811bc6
ead4784
9bfa15c
8a0eae6
87e75eb
812921f
89060eb
33c634f
34979a5
1a6f0e5
e388be5
8eb3e40
72bbf5d
84dfd44
d632bf1
f21ee31
0da5ed6
b244f8f
f2b0f8a
cf618ae
3e540fe
4099ca6
d74c116
dbb9e7a
93f147d
892dedc
b0fb9a5
ec8230c
9e81af0
dbe2622
d57517e
fb9df5a
b9f6f94
7469725
52c148d
5748a2e
b82b142
35fce8c
1a54100
23117d0
696e9c9
b280863
18eb414
b9e2818
daaa25a
ddd1ad0
9900315
5a4f462
4de37a3
173dbec
a3ddae6
dadc809
0c292cf
faadc50
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,8 @@ | |
| #include "arrow/filesystem/path_util.h" | ||
| #include "arrow/filesystem/s3fs.h" | ||
| #include "arrow/engine/substrait/util.h" | ||
| #include "arrow/engine/substrait/serde.h" | ||
| #include "arrow/engine/substrait/relation.h" | ||
| #include "arrow/ipc/api.h" | ||
| #include "arrow/util/iterator.h" | ||
| #include "jni_util.h" | ||
|
|
@@ -200,7 +202,6 @@ arrow::Result<std::shared_ptr<arrow::Schema>> SchemaFromColumnNames( | |
| return arrow::Status::Invalid("Partition column '", ref.ToString(), "' is not in dataset schema"); | ||
| } | ||
| } | ||
|
|
||
| return schema(std::move(columns))->WithMetadata(input->metadata()); | ||
| } | ||
| } // namespace | ||
|
|
@@ -317,6 +318,14 @@ std::shared_ptr<arrow::Table> GetTableByName(const std::vector<std::string>& nam | |
| return it->second; | ||
| } | ||
|
|
||
| std::shared_ptr<arrow::Buffer> LoadArrowBufferFromByteBuffer(JNIEnv* env, jobject byte_buffer) { | ||
| const auto *buff = reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(byte_buffer)); | ||
| int length = env->GetDirectBufferCapacity(byte_buffer); | ||
| std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::AllocateBuffer(length)); | ||
| std::memcpy(buffer->mutable_data(), buff, length); | ||
| return buffer; | ||
| } | ||
|
|
||
| /* | ||
| * Class: org_apache_arrow_dataset_jni_NativeMemoryPool | ||
| * Method: getDefaultMemoryPool | ||
|
|
@@ -455,11 +464,12 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_closeDataset | |
| /* | ||
| * Class: org_apache_arrow_dataset_jni_JniWrapper | ||
| * Method: createScanner | ||
| * Signature: (J[Ljava/lang/String;JJ)J | ||
| * Signature: (J[Ljava/lang/String;Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;JJ)J | ||
| */ | ||
| JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScanner( | ||
| JNIEnv* env, jobject, jlong dataset_id, jobjectArray columns, jlong batch_size, | ||
| jlong memory_pool_id) { | ||
| JNIEnv* env, jobject, jlong dataset_id, jobjectArray columns, | ||
| jobject substrait_projection, jobject substrait_filter, | ||
| jlong batch_size, jlong memory_pool_id) { | ||
| JNI_METHOD_START | ||
| arrow::MemoryPool* pool = reinterpret_cast<arrow::MemoryPool*>(memory_pool_id); | ||
| if (pool == nullptr) { | ||
|
|
@@ -474,6 +484,39 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScann | |
| std::vector<std::string> column_vector = ToStringVector(env, columns); | ||
| JniAssertOkOrThrow(scanner_builder->Project(column_vector)); | ||
| } | ||
| if (substrait_projection != nullptr) { | ||
| std::shared_ptr<arrow::Buffer> buffer = LoadArrowBufferFromByteBuffer(env, | ||
| substrait_projection); | ||
| std::vector<arrow::compute::Expression> project_exprs; | ||
| std::vector<std::string> project_names; | ||
| arrow::engine::BoundExpressions bounded_expression = | ||
| JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer)); | ||
| for(arrow::engine::NamedExpression& named_expression : | ||
| bounded_expression.named_expressions) { | ||
| if (!(named_expression.expression.type()->id() == arrow::Type::BOOL)) { | ||
| project_exprs.push_back(std::move(named_expression.expression)); | ||
| project_names.push_back(std::move(named_expression.name)); | ||
| } | ||
| } | ||
| JniAssertOkOrThrow(scanner_builder->Project(std::move(project_exprs), std::move(project_names))); | ||
| } | ||
| if (substrait_filter != nullptr) { | ||
| std::shared_ptr<arrow::Buffer> buffer = LoadArrowBufferFromByteBuffer(env, | ||
| substrait_filter); | ||
| std::optional<arrow::compute::Expression> filter_expr; | ||
| arrow::engine::BoundExpressions bounded_expression = | ||
| JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer)); | ||
| for(arrow::engine::NamedExpression& named_expression : | ||
| bounded_expression.named_expressions) { | ||
| if (named_expression.expression.type()->id() == arrow::Type::BOOL) { | ||
| if (filter_expr.has_value()) { | ||
| JniThrow("Only one filter expression may be provided"); | ||
| } | ||
| filter_expr = named_expression.expression; | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Throw if the expression is not of type BOOL.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added |
||
| } | ||
| JniAssertOkOrThrow(scanner_builder->Filter(*filter_expr)); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will crash if you provide an empty list of expressions.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changed |
||
| } | ||
| JniAssertOkOrThrow(scanner_builder->BatchSize(batch_size)); | ||
|
|
||
| auto scanner = JniGetOrThrow(scanner_builder->Finish()); | ||
|
|
@@ -748,10 +791,7 @@ JNIEXPORT void JNICALL | |
| arrow::engine::ConversionOptions conversion_options; | ||
| conversion_options.named_table_provider = std::move(table_provider); | ||
| // mapping arrow::Buffer | ||
| auto *buff = reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(plan)); | ||
| int length = env->GetDirectBufferCapacity(plan); | ||
| std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::AllocateBuffer(length)); | ||
| std::memcpy(buffer->mutable_data(), buff, length); | ||
| std::shared_ptr<arrow::Buffer> buffer = LoadArrowBufferFromByteBuffer(env, plan); | ||
| // execute plan | ||
| std::shared_ptr<arrow::RecordBatchReader> reader_out = | ||
| JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, nullptr, nullptr, conversion_options)); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
|
|
||
| package org.apache.arrow.dataset.scanner; | ||
|
|
||
| import java.nio.ByteBuffer; | ||
| import java.util.Optional; | ||
|
|
||
| import org.apache.arrow.util.Preconditions; | ||
|
|
@@ -25,8 +26,10 @@ | |
| * Options used during scanning. | ||
| */ | ||
| public class ScanOptions { | ||
| private final Optional<String[]> columns; | ||
| private final long batchSize; | ||
| private final Optional<String[]> columns; | ||
| private final Optional<ByteBuffer> substraitProjection; | ||
| private final Optional<ByteBuffer> substraitFilter; | ||
|
|
||
| /** | ||
| * Constructor. | ||
|
|
@@ -56,6 +59,8 @@ public ScanOptions(long batchSize, Optional<String[]> columns) { | |
| Preconditions.checkNotNull(columns); | ||
| this.batchSize = batchSize; | ||
| this.columns = columns; | ||
| this.substraitProjection = Optional.empty(); | ||
| this.substraitFilter = Optional.empty(); | ||
| } | ||
|
|
||
| public ScanOptions(long batchSize) { | ||
|
|
@@ -69,4 +74,77 @@ public Optional<String[]> getColumns() { | |
| public long getBatchSize() { | ||
| return batchSize; | ||
| } | ||
|
|
||
| public Optional<ByteBuffer> getSubstraitProjection() { | ||
| return substraitProjection; | ||
| } | ||
|
|
||
| public Optional<ByteBuffer> getSubstraitFilter() { | ||
| return substraitFilter; | ||
| } | ||
|
|
||
| /** | ||
| * Builder for Options used during scanning. | ||
| */ | ||
| public static class Builder { | ||
| private final long batchSize; | ||
| private Optional<String[]> columns; | ||
| private ByteBuffer substraitProjection; | ||
| private ByteBuffer substraitFilter; | ||
|
Comment on lines
+92
to
+93
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: could
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
|
|
||
| /** | ||
| * Constructor. | ||
| * @param batchSize Maximum row number of each returned {@link org.apache.arrow.vector.ipc.message.ArrowRecordBatch} | ||
| */ | ||
| public Builder(long batchSize) { | ||
| this.batchSize = batchSize; | ||
| } | ||
|
|
||
| /** | ||
| * Set the Projected columns. Empty for scanning all columns. | ||
| * | ||
| * @param columns Projected columns. Empty for scanning all columns. | ||
| * @return the ScanOptions configured. | ||
| */ | ||
| public Builder columns(Optional<String[]> columns) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One more thing: We don't need
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The values for substraitProjection and substraitFilter have been changed. The rule definition for columns mentions that empty means scanning all columns, so that is how it works.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah I see. I assumed a user would prefer to only use the |
||
| Preconditions.checkNotNull(columns); | ||
| this.columns = columns; | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Set the Substrait extended expression for Projection new columns. | ||
| * | ||
| * @param substraitProjection Expressions to evaluate for project new columns. | ||
| * @return the ScanOptions configured. | ||
| */ | ||
| public Builder substraitProjection(ByteBuffer substraitProjection) { | ||
| Preconditions.checkNotNull(substraitProjection); | ||
| this.substraitProjection = substraitProjection; | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Set the Substrait extended expression for Filter. | ||
| * | ||
| * @param substraitFilter Expressions to evaluate for apply Filter. | ||
| * @return the ScanOptions configured. | ||
| */ | ||
| public Builder substraitFilter(ByteBuffer substraitFilter) { | ||
| Preconditions.checkNotNull(substraitFilter); | ||
| this.substraitFilter = substraitFilter; | ||
| return this; | ||
| } | ||
|
|
||
| public ScanOptions build() { | ||
| return new ScanOptions(this); | ||
| } | ||
| } | ||
|
|
||
| private ScanOptions(Builder builder) { | ||
| batchSize = builder.batchSize; | ||
| columns = builder.columns; | ||
| substraitProjection = Optional.ofNullable(builder.substraitProjection); | ||
| substraitFilter = Optional.ofNullable(builder.substraitFilter); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.