diff --git a/python/pyproject.toml b/python/pyproject.toml index 24a6ea2a00e..a90aac3f408 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -60,7 +60,8 @@ tests = [ # tensorflow is broken on 2.20.0 with macOS ARM. "tensorflow<=2.19.0", "tqdm", - "datafusion", + # Need to align with the datafusion version we use at lance rust + "datafusion==48.0.0", ] dev = ["ruff==0.4.1", "pyright"] benchmarks = ["pytest-benchmark"] diff --git a/python/python/tests/test_json.py b/python/python/tests/test_json.py index bf592bccc60..6ac046842ee 100644 --- a/python/python/tests/test_json.py +++ b/python/python/tests/test_json.py @@ -40,35 +40,33 @@ def test_json_basic_write_read(): # Read back the dataset dataset = lance.dataset(dataset_path) - # Verify schema + # Verify storage schema assert len(dataset.schema) == 2 assert dataset.schema.field("id").type == pa.int32() - # Check that JSON field is properly recognized - json_field_schema = dataset.schema.field("data") - # Should be PyArrow JSON type - assert json_field_schema.type == pa.json_() + # Check that JSON field is stored as JSONB internally + storage_field = dataset.schema.field("data") + assert storage_field.type == pa.large_binary() + assert storage_field.metadata is not None + assert b"ARROW:extension:name" in storage_field.metadata + assert storage_field.metadata[b"ARROW:extension:name"] == b"lance.json" # Read data back result_table = dataset.to_table() + # Check that data is returned as Arrow JSON for Python + result_field = result_table.schema.field("data") + # PyArrow extension types print as extension but + # the storage type is utf8 + assert ( + str(result_field.type) == "extension" + or result_field.type == pa.utf8() + ) + # Verify data assert result_table.num_rows == 5 assert result_table.column("id").to_pylist() == [1, 2, 3, 4, 5] - # Verify the data column is JSON type - data_column = result_table.column("data") - assert data_column.type == pa.json_() - - # Verify the JSON data is correctly preserved - for i, expected in enumerate(json_strings): - actual = data_column[i].as_py() - if expected is None: - assert actual is None - else: - # Arrow JSON returns strings, verify they match - assert actual == expected - def test_json_with_other_types(): """Test JSON type alongside other data types.""" @@ -184,3 +182,176 @@ def test_json_batch_operations(): for batch in batches: assert batch.num_rows == batch_size + + +def test_json_path_queries(): + """Test JSON path queries using json_extract and json_exists.""" + # Create test data with JSON columns + json_data = [ + {"user": {"name": "Alice", "age": 30}, "tags": ["python", "ml"]}, + {"user": {"name": "Bob", "age": 25}, "tags": ["rust", "db"]}, + {"user": {"name": "Charlie"}, "tags": []}, + None, + ] + + json_strings = [json.dumps(d) if d is not None else None for d in json_data] + json_arr = pa.array(json_strings, type=pa.json_()) + + # Create a Lance dataset with JSON data + table = pa.table( + { + "id": [1, 2, 3, 4], + "data": json_arr, + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + ds_path = Path(tmpdir) / "json_test.lance" + lance.write_dataset(table, ds_path, data_storage_version="2.2") + dataset = lance.dataset(ds_path) + + # Test json_extract + result = dataset.to_table( + filter="json_extract(data, '$.user.name') = '\"Alice\"'" + ) + assert result.num_rows == 1 + assert result["id"][0].as_py() == 1 + + # Test json_exists + result = dataset.to_table(filter="json_exists(data, '$.user.age')") + assert result.num_rows == 2 # Alice and Bob have age field + + # Test json_array_contains + result = dataset.to_table( + filter="json_array_contains(data, '$.tags', 'python')" + ) + assert result.num_rows == 1 + assert result["id"][0].as_py() == 1 + + +def test_json_get_functions(): + """Test json_get_* functions for type-safe value extraction.""" + # Create test data with various JSON types + json_data = [ + {"name": "Alice", "age": 30, "active": True, "score": 95.5}, + {"name": "Bob", "age": 25, "active": False, "score": 87.3}, + {"name": "Charlie", "age": "35", "active": "true", "score": "92"}, + {"name": "David"}, # Missing fields + ] + + json_strings = [json.dumps(d) for d in json_data] + json_arr = pa.array(json_strings, type=pa.json_()) + + table = pa.table( + { + "id": [1, 2, 3, 4], + "data": json_arr, + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + ds_path = Path(tmpdir) / "json_get_test.lance" + lance.write_dataset(table, ds_path, data_storage_version="2.2") + dataset = lance.dataset(ds_path) + + # Test json_get_string + result = dataset.to_table(filter="json_get_string(data, 'name') = 'Alice'") + assert result.num_rows == 1 + assert result["id"][0].as_py() == 1 + + # Test json_get_int with type coercion + result = dataset.to_table(filter="json_get_int(data, 'age') > 28") + assert result.num_rows == 2 # Alice (30) and Charlie ("35" -> 35) + + # Test json_get_bool with type coercion + result = dataset.to_table(filter="json_get_bool(data, 'active') = true") + assert result.num_rows == 2 # Alice (true) and Charlie ("true" -> true) + + # Test json_get_float + result = dataset.to_table(filter="json_get_float(data, 'score') > 90") + assert result.num_rows == 2 # Alice (95.5) and Charlie ("92" -> 92.0) + + +def test_nested_json_access(): + """Test accessing nested JSON structures.""" + json_data = [ + {"user": {"profile": {"name": "Alice", "settings": {"theme": "dark"}}}}, + {"user": {"profile": {"name": "Bob", "settings": {"theme": "light"}}}}, + ] + + json_strings = [json.dumps(d) for d in json_data] + json_arr = pa.array(json_strings, type=pa.json_()) + + table = pa.table( + { + "id": [1, 2], + "data": json_arr, + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + ds_path = Path(tmpdir) / "nested_json_test.lance" + lance.write_dataset(table, ds_path, data_storage_version="2.2") + dataset = lance.dataset(ds_path) + + # Access nested fields using json_get recursively + # First get user, then profile, then name + result = dataset.to_table( + filter=""" + json_get_string( + json_get( + json_get(data, 'user'), + 'profile'), + 'name') + = 'Alice'""" + ) + assert result.num_rows == 1 + assert result["id"][0].as_py() == 1 + + # Or use JSONPath for deep access + result = dataset.to_table( + filter="json_extract(data, '$.user.profile.settings.theme') = '\"dark\"'" + ) + assert result.num_rows == 1 + assert result["id"][0].as_py() == 1 + + +def test_json_array_operations(): + """Test JSON array operations.""" + json_data = [ + {"items": ["apple", "banana", "orange"], "counts": [1, 2, 3, 4, 5]}, + {"items": ["grape", "melon"], "counts": [10, 20]}, + {"items": [], "counts": []}, + ] + + json_strings = [json.dumps(d) for d in json_data] + json_arr = pa.array(json_strings, type=pa.json_()) + + table = pa.table( + { + "id": [1, 2, 3], + "data": json_arr, + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + ds_path = Path(tmpdir) / "array_json_test.lance" + lance.write_dataset(table, ds_path, data_storage_version="2.2") + dataset = lance.dataset(ds_path) + + # Test array contains + result = dataset.to_table( + filter="json_array_contains(data, '$.items', 'apple')" + ) + assert result.num_rows == 1 + assert result["id"][0].as_py() == 1 + + # Test array length + result = dataset.to_table(filter="json_array_length(data, '$.counts') > 3") + assert result.num_rows == 1 + assert result["id"][0].as_py() == 1 + + # Test empty array + result = dataset.to_table(filter="json_array_length(data, '$.items') = 0") + assert result.num_rows == 1 + assert result["id"][0].as_py() == 3 diff --git a/rust/lance-arrow/src/json.rs b/rust/lance-arrow/src/json.rs index 244bb446504..1b1e4dead0e 100644 --- a/rust/lance-arrow/src/json.rs +++ b/rust/lance-arrow/src/json.rs @@ -7,15 +7,18 @@ use std::convert::TryFrom; use std::sync::Arc; use arrow_array::builder::LargeBinaryBuilder; -use arrow_array::{Array, ArrayRef, LargeBinaryArray, LargeStringArray, StringArray}; +use arrow_array::{Array, ArrayRef, LargeBinaryArray, LargeStringArray, RecordBatch, StringArray}; use arrow_data::ArrayData; -use arrow_schema::{ArrowError, DataType, Field as ArrowField}; +use arrow_schema::{ArrowError, DataType, Field as ArrowField, Schema}; use crate::ARROW_EXT_NAME_KEY; -/// Arrow extension type name for JSON data +/// Arrow extension type name for JSON data (Lance internal) pub const JSON_EXT_NAME: &str = "lance.json"; +/// Arrow extension type name for JSON data (Arrow official) +pub const ARROW_JSON_EXT_NAME: &str = "arrow.json"; + /// Check if a field is a JSON extension field (Lance internal JSONB storage) pub fn is_json_field(field: &ArrowField) -> bool { field.data_type() == &DataType::LargeBinary @@ -26,6 +29,17 @@ pub fn is_json_field(field: &ArrowField) -> bool { .unwrap_or_default() } +/// Check if a field is an Arrow JSON extension field (PyArrow pa.json() type) +pub fn is_arrow_json_field(field: &ArrowField) -> bool { + // Arrow JSON extension type uses Utf8 or LargeUtf8 as storage type + (field.data_type() == &DataType::Utf8 || field.data_type() == &DataType::LargeUtf8) + && field + .metadata() + .get(ARROW_EXT_NAME_KEY) + .map(|name| name == ARROW_JSON_EXT_NAME) + .unwrap_or_default() +} + /// Check if a field or any of its descendants is a JSON field pub fn has_json_fields(field: &ArrowField) -> bool { if is_json_field(field) { @@ -309,8 +323,9 @@ fn get_json_path( ) -> Result, Box> { let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes())?; let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb); - match raw_jsonb.select_by_path(&json_path) { + match selector.select_values(&json_path) { Ok(values) => { if values.is_empty() { Ok(None) @@ -322,6 +337,166 @@ fn get_json_path( } } +/// Convert an Arrow JSON field to Lance JSON field (with JSONB storage) +pub fn arrow_json_to_lance_json(field: &ArrowField) -> ArrowField { + if is_arrow_json_field(field) { + // Convert Arrow JSON (Utf8/LargeUtf8) to Lance JSON (LargeBinary) + // Preserve all metadata from the original field + let mut new_field = + ArrowField::new(field.name(), DataType::LargeBinary, field.is_nullable()); + + // Copy all metadata from the original field + let mut metadata = field.metadata().clone(); + // Add/override the extension metadata for Lance JSON + metadata.insert(ARROW_EXT_NAME_KEY.to_string(), JSON_EXT_NAME.to_string()); + + new_field = new_field.with_metadata(metadata); + new_field + } else { + field.clone() + } +} + +/// Convert a RecordBatch with Lance JSON columns (JSONB) back to Arrow JSON format (strings) +pub fn convert_lance_json_to_arrow( + batch: &arrow_array::RecordBatch, +) -> Result { + let schema = batch.schema(); + let mut needs_conversion = false; + let mut new_fields = Vec::with_capacity(schema.fields().len()); + let mut new_columns = Vec::with_capacity(batch.num_columns()); + + for (i, field) in schema.fields().iter().enumerate() { + let column = batch.column(i); + + if is_json_field(field) { + needs_conversion = true; + + // Convert the field back to Arrow JSON (Utf8) + let mut new_field = ArrowField::new(field.name(), DataType::Utf8, field.is_nullable()); + let mut metadata = field.metadata().clone(); + // Change from lance.json to arrow.json + metadata.insert( + ARROW_EXT_NAME_KEY.to_string(), + ARROW_JSON_EXT_NAME.to_string(), + ); + new_field.set_metadata(metadata); + new_fields.push(new_field); + + // Convert the data from JSONB to JSON strings + if batch.num_rows() == 0 { + // For empty batches, create an empty String array + let empty_strings = arrow_array::builder::StringBuilder::new().finish(); + new_columns.push(Arc::new(empty_strings) as ArrayRef); + } else { + // Convert JSONB back to JSON strings + let binary_array = column + .as_any() + .downcast_ref::() + .ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "Lance JSON field '{}' has unexpected type", + field.name() + )) + })?; + + let mut builder = arrow_array::builder::StringBuilder::new(); + for i in 0..binary_array.len() { + if binary_array.is_null(i) { + builder.append_null(); + } else { + let jsonb_bytes = binary_array.value(i); + let json_str = decode_json(jsonb_bytes).map_err(|e| { + ArrowError::InvalidArgumentError(format!( + "Failed to decode JSON: {}", + e + )) + })?; + builder.append_value(&json_str); + } + } + new_columns.push(Arc::new(builder.finish()) as ArrayRef); + } + } else { + new_fields.push(field.as_ref().clone()); + new_columns.push(column.clone()); + } + } + + if needs_conversion { + let new_schema = Arc::new(Schema::new_with_metadata( + new_fields, + schema.metadata().clone(), + )); + RecordBatch::try_new(new_schema, new_columns) + } else { + // No conversion needed, return original batch + Ok(batch.clone()) + } +} + +/// Convert a RecordBatch with Arrow JSON columns to Lance JSON format (JSONB) +pub fn convert_json_columns( + batch: &arrow_array::RecordBatch, +) -> Result { + let schema = batch.schema(); + let mut needs_conversion = false; + let mut new_fields = Vec::with_capacity(schema.fields().len()); + let mut new_columns = Vec::with_capacity(batch.num_columns()); + + for (i, field) in schema.fields().iter().enumerate() { + let column = batch.column(i); + + if is_arrow_json_field(field) { + needs_conversion = true; + + // Convert the field metadata + new_fields.push(arrow_json_to_lance_json(field)); + + // Convert the data from JSON strings to JSONB + if batch.num_rows() == 0 { + // For empty batches, create an empty LargeBinary array + let empty_binary = LargeBinaryBuilder::new().finish(); + new_columns.push(Arc::new(empty_binary) as ArrayRef); + } else { + // Convert non-empty data + let json_array = + if let Some(string_array) = column.as_any().downcast_ref::() { + JsonArray::try_from(string_array)? + } else if let Some(large_string_array) = + column.as_any().downcast_ref::() + { + JsonArray::try_from(large_string_array)? + } else { + return Err(ArrowError::InvalidArgumentError(format!( + "Arrow JSON field '{}' has unexpected storage type: {:?}", + field.name(), + column.data_type() + ))); + }; + + let binary_array = json_array.into_inner(); + + new_columns.push(Arc::new(binary_array) as ArrayRef); + } + } else { + new_fields.push(field.as_ref().clone()); + new_columns.push(column.clone()); + } + } + + if needs_conversion { + let new_schema = Arc::new(Schema::new_with_metadata( + new_fields, + schema.metadata().clone(), + )); + RecordBatch::try_new(new_schema, new_columns) + } else { + // No conversion needed, return original batch + Ok(batch.clone()) + } +} + #[cfg(test)] mod tests { use super::*; @@ -382,4 +557,52 @@ mod tests { let age = json_array.json_path(1, "$.user.age").unwrap(); assert_eq!(age, None); } + + #[test] + fn test_convert_json_columns() { + // Create a batch with Arrow JSON column + let json_strings = vec![Some(r#"{"name": "Alice"}"#), Some(r#"{"name": "Bob"}"#)]; + let json_arr = StringArray::from(json_strings); + + // Create field with arrow.json extension + let mut field = ArrowField::new("data", DataType::Utf8, false); + let mut metadata = std::collections::HashMap::new(); + metadata.insert( + ARROW_EXT_NAME_KEY.to_string(), + ARROW_JSON_EXT_NAME.to_string(), + ); + field.set_metadata(metadata); + + let schema = Arc::new(Schema::new(vec![field])); + let batch = RecordBatch::try_new(schema, vec![Arc::new(json_arr) as ArrayRef]).unwrap(); + + // Convert the batch + let converted = convert_json_columns(&batch).unwrap(); + + // Check the converted schema + assert_eq!(converted.num_columns(), 1); + let converted_schema = converted.schema(); + let converted_field = converted_schema.field(0); + assert_eq!(converted_field.data_type(), &DataType::LargeBinary); + assert_eq!( + converted_field.metadata().get(ARROW_EXT_NAME_KEY), + Some(&JSON_EXT_NAME.to_string()) + ); + + // Check the data was converted + let converted_column = converted.column(0); + assert_eq!(converted_column.data_type(), &DataType::LargeBinary); + assert_eq!(converted_column.len(), 2); + + // Verify the data is valid JSONB + let binary_array = converted_column + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..binary_array.len() { + let jsonb_bytes = binary_array.value(i); + let decoded = decode_json(jsonb_bytes).unwrap(); + assert!(decoded.contains("name")); + } + } } diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 1d236dfd313..0c0b635b9ad 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -20,7 +20,10 @@ use arrow_array::{ }; use arrow_schema::{DataType, Field as ArrowField}; use deepsize::DeepSizeOf; -use lance_arrow::{json::is_json_field, ARROW_EXT_NAME_KEY, *}; +use lance_arrow::{ + json::{is_arrow_json_field, is_json_field}, + ARROW_EXT_NAME_KEY, *, +}; use snafu::location; use super::{ @@ -988,8 +991,8 @@ impl TryFrom<&ArrowField> for Field { .map(|s| matches!(s.to_lowercase().as_str(), "true" | "1" | "yes")) .unwrap_or(false); - // Check for JSON extension type - let logical_type = if is_json_field(field) { + // Check for JSON extension types (both Arrow and Lance) + let logical_type = if is_arrow_json_field(field) || is_json_field(field) { LogicalType::from("json") } else { LogicalType::try_from(field.data_type())? diff --git a/rust/lance-datafusion/src/planner.rs b/rust/lance-datafusion/src/planner.rs index a654a8d653f..f231948f1e4 100644 --- a/rust/lance-datafusion/src/planner.rs +++ b/rust/lance-datafusion/src/planner.rs @@ -20,7 +20,7 @@ use datafusion::common::DFSchema; use datafusion::config::ConfigOptions; use datafusion::error::Result as DFResult; use datafusion::execution::config::SessionConfig; -use datafusion::execution::context::SessionState; +use datafusion::execution::context::{SessionContext, SessionState}; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::logical_expr::expr::ScalarFunction; @@ -162,20 +162,24 @@ impl Default for LanceContextProvider { fn default() -> Self { let config = SessionConfig::new(); let runtime = RuntimeEnvBuilder::new().build_arc().unwrap(); + + let ctx = SessionContext::new_with_config_rt(config.clone(), runtime.clone()); + crate::udf::register_functions(&ctx); + + let state = ctx.state(); + + // SessionState does not expose expr_planners, so we need to get them separately let mut state_builder = SessionStateBuilder::new() .with_config(config) .with_runtime_env(runtime) .with_default_features(); - // SessionState does not expose expr_planners, so we need to get the default ones from - // the builder and store them to return from get_expr_planners - // unwrap safe because with_default_features sets expr_planners let expr_planners = state_builder.expr_planners().as_ref().unwrap().clone(); Self { options: ConfigOptions::default(), - state: state_builder.build(), + state, expr_planners, } } diff --git a/rust/lance-datafusion/src/udf.rs b/rust/lance-datafusion/src/udf.rs index 7e948ce2c31..cbfa26c29ba 100644 --- a/rust/lance-datafusion/src/udf.rs +++ b/rust/lance-datafusion/src/udf.rs @@ -10,12 +10,21 @@ use datafusion::prelude::SessionContext; use datafusion_functions::utils::make_scalar_function; use std::sync::{Arc, LazyLock}; -mod json; +pub(crate) mod json; /// Register UDF functions to datafusion context. pub fn register_functions(ctx: &SessionContext) { ctx.register_udf(CONTAINS_TOKENS_UDF.clone()); + // JSON functions ctx.register_udf(json::json_extract_udf()); + ctx.register_udf(json::json_exists_udf()); + ctx.register_udf(json::json_get_udf()); + ctx.register_udf(json::json_get_string_udf()); + ctx.register_udf(json::json_get_int_udf()); + ctx.register_udf(json::json_get_float_udf()); + ctx.register_udf(json::json_get_bool_udf()); + ctx.register_udf(json::json_array_contains_udf()); + ctx.register_udf(json::json_array_length_udf()); } /// This method checks whether a string contains all specified tokens. The tokens are separated by diff --git a/rust/lance-datafusion/src/udf/json.rs b/rust/lance-datafusion/src/udf/json.rs index 1cfe9611af6..c8c29c2f7cd 100644 --- a/rust/lance-datafusion/src/udf/json.rs +++ b/rust/lance-datafusion/src/udf/json.rs @@ -1,16 +1,209 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use arrow_array::builder::StringBuilder; +use arrow_array::builder::{ + BooleanBuilder, Float64Builder, Int64Builder, LargeBinaryBuilder, StringBuilder, +}; use arrow_array::{Array, ArrayRef, LargeBinaryArray, StringArray}; use arrow_schema::DataType; -use datafusion::error::Result; +use datafusion::error::{DataFusionError, Result}; use datafusion::logical_expr::{ScalarUDF, Volatility}; use datafusion::physical_plan::ColumnarValue; use datafusion::prelude::create_udf; use std::sync::Arc; -/// Create the json_extract UDF for extracting JSONPath from JSONB data +/// Common helper functions and types for JSON UDFs +mod common { + use super::*; + + /// Key type for JSON field access - optimizes field/index parsing + #[derive(Debug, Clone)] + pub enum KeyType { + Field(String), + Index(usize), + } + + impl KeyType { + /// Parse a key string into either a field name or array index (once per operation) + pub fn parse(key: &str) -> Self { + if let Ok(index) = key.parse::() { + Self::Index(index) + } else { + Self::Field(key.to_string()) + } + } + } + + /// Convert ColumnarValue arguments to ArrayRef vector + /// + /// Note: This implementation currently broadcasts scalars to arrays. + /// Future optimization: handle scalars directly without broadcasting + /// to improve performance for scalar inputs. + pub fn columnar_to_arrays(args: &[ColumnarValue]) -> Vec { + args.iter() + .map(|arg| match arg { + ColumnarValue::Array(arr) => arr.clone(), + ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), + }) + .collect() + } + + /// Create DataFusionError for execution failures (simplified error wrapping) + pub fn execution_error(msg: impl Into) -> DataFusionError { + DataFusionError::Execution(msg.into()) + } + + /// Validate argument count for UDF + pub fn validate_arg_count( + args: &[ArrayRef], + expected: usize, + function_name: &str, + ) -> Result<()> { + if args.len() != expected { + return Err(execution_error(format!( + "{} requires exactly {} arguments", + function_name, expected + ))); + } + Ok(()) + } + + /// Extract and validate LargeBinaryArray from first argument + pub fn extract_jsonb_array(args: &[ArrayRef]) -> Result<&LargeBinaryArray> { + args[0] + .as_any() + .downcast_ref::() + .ok_or_else(|| execution_error("First argument must be LargeBinary")) + } + + /// Extract and validate StringArray from specified argument + pub fn extract_string_array(args: &[ArrayRef], arg_index: usize) -> Result<&StringArray> { + args[arg_index] + .as_any() + .downcast_ref::() + .ok_or_else(|| execution_error(format!("Argument {} must be String", arg_index + 1))) + } + + /// Get string value at index, handling scalar broadcast case + /// When a scalar is converted to an array, it becomes a single-element array + /// This function handles accessing that value repeatedly for all rows + pub fn get_string_value_at(string_array: &StringArray, index: usize) -> Option<&str> { + // Handle scalar broadcast case: if array has only 1 element, always use index 0 + let actual_index = if string_array.len() == 1 { 0 } else { index }; + + if string_array.is_null(actual_index) { + None + } else { + Some(string_array.value(actual_index)) + } + } + + /// Get JSON field/element using pre-parsed key type (avoids repeated parsing) + pub fn get_json_value_by_key( + raw_jsonb: &jsonb::RawJsonb, + key_type: &KeyType, + ) -> Result> { + match key_type { + KeyType::Field(field) => raw_jsonb + .get_by_name(field, false) + .map_err(|e| execution_error(format!("Failed to get field '{}': {}", field, e))), + KeyType::Index(index) => raw_jsonb.get_by_index(*index).map_err(|e| { + execution_error(format!("Failed to get array element [{}]: {}", index, e)) + }), + } + } + + /// Parse JSONPath with proper error handling (no false returns) + pub fn parse_json_path(path: &str) -> Result> { + jsonb::jsonpath::parse_json_path(path.as_bytes()) + .map_err(|e| execution_error(format!("Invalid JSONPath '{}': {}", path, e))) + } +} + +/// Convert JSONB value to string using jsonb's built-in serde (strict mode) +fn json_value_to_string(value: jsonb::OwnedJsonb) -> Result> { + let raw_jsonb = value.as_raw(); + + // Check for null first + if raw_jsonb + .is_null() + .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))? + { + return Ok(None); + } + + // Use jsonb's built-in to_str() method - strict conversion + raw_jsonb + .to_str() + .map(Some) + .map_err(|e| common::execution_error(format!("Failed to convert to string: {}", e))) +} + +/// Convert JSONB value to integer using jsonb's built-in serde (strict mode) +fn json_value_to_int(value: jsonb::OwnedJsonb) -> Result> { + let raw_jsonb = value.as_raw(); + + // Check for null first + if raw_jsonb + .is_null() + .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))? + { + return Ok(None); + } + + // Use jsonb's built-in to_i64() method - strict conversion + raw_jsonb + .to_i64() + .map(Some) + .map_err(|e| common::execution_error(format!("Failed to convert to integer: {}", e))) +} + +/// Convert JSONB value to float using jsonb's built-in serde (strict mode) +fn json_value_to_float(value: jsonb::OwnedJsonb) -> Result> { + let raw_jsonb = value.as_raw(); + + // Check for null first + if raw_jsonb + .is_null() + .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))? + { + return Ok(None); + } + + // Use jsonb's built-in to_f64() method - strict conversion + raw_jsonb + .to_f64() + .map(Some) + .map_err(|e| common::execution_error(format!("Failed to convert to float: {}", e))) +} + +/// Convert JSONB value to boolean using jsonb's built-in serde (strict mode) +fn json_value_to_bool(value: jsonb::OwnedJsonb) -> Result> { + let raw_jsonb = value.as_raw(); + + // Check for null first + if raw_jsonb + .is_null() + .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))? + { + return Ok(None); + } + + // Use jsonb's built-in to_bool() method - strict conversion + raw_jsonb + .to_bool() + .map(Some) + .map_err(|e| common::execution_error(format!("Failed to convert to boolean: {}", e))) +} + +/// Create the json_extract UDF for extracting JSONPath from JSON data +/// +/// # Arguments +/// * First parameter: JSONB binary data (LargeBinary) +/// * Second parameter: JSONPath expression as string (Utf8) +/// +/// # Returns +/// String representation of the extracted value, or null if path not found pub fn json_extract_udf() -> ScalarUDF { create_udf( "json_extract", @@ -23,63 +216,30 @@ pub fn json_extract_udf() -> ScalarUDF { /// Implementation of json_extract function with ColumnarValue fn json_extract_columnar_impl(args: &[ColumnarValue]) -> Result { - let arrays: Vec = args - .iter() - .map(|arg| match arg { - ColumnarValue::Array(arr) => arr.clone(), - ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), - }) - .collect(); - + let arrays = common::columnar_to_arrays(args); let result = json_extract_impl(&arrays)?; Ok(ColumnarValue::Array(result)) } /// Implementation of json_extract function fn json_extract_impl(args: &[ArrayRef]) -> Result { - if args.len() != 2 { - return Err(datafusion::error::DataFusionError::Execution( - "json_extract requires exactly 2 arguments".to_string(), - )); - } - - let jsonb_array = args[0] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "First argument must be LargeBinary".to_string(), - ) - })?; - - let path_array = args[1] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "Second argument must be String".to_string(), - ) - })?; - - let mut builder = StringBuilder::new(); + common::validate_arg_count(args, 2, "json_extract")?; + + let jsonb_array = common::extract_jsonb_array(args)?; + let path_array = common::extract_string_array(args, 1)?; + let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024); for i in 0..jsonb_array.len() { - if jsonb_array.is_null(i) || path_array.is_null(i) { + if jsonb_array.is_null(i) { builder.append_null(); - } else { + } else if let Some(path) = common::get_string_value_at(path_array, i) { let jsonb_bytes = jsonb_array.value(i); - let path = path_array.value(i); - - match extract_json_path(jsonb_bytes, path) { - Ok(Some(value)) => builder.append_value(&value), - Ok(None) => builder.append_null(), - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to extract JSONPath: {}", - e - ))); - } + match extract_json_path(jsonb_bytes, path)? { + Some(value) => builder.append_value(&value), + None => builder.append_null(), } + } else { + builder.append_null(); } } @@ -87,13 +247,14 @@ fn json_extract_impl(args: &[ArrayRef]) -> Result { } /// Extract value from JSONB using JSONPath +/// +/// Note: Uses `select_values` instead of the deprecated `select_by_path` method fn extract_json_path(jsonb_bytes: &[u8], path: &str) -> Result> { - let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes()).map_err(|e| { - datafusion::error::DataFusionError::Execution(format!("Invalid JSONPath: {}", e)) - })?; + let json_path = common::parse_json_path(path)?; let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); - match raw_jsonb.select_by_path(&json_path) { + let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb); + match selector.select_values(&json_path) { Ok(values) => { if values.is_empty() { Ok(None) @@ -102,7 +263,542 @@ fn extract_json_path(jsonb_bytes: &[u8], path: &str) -> Result> { Ok(Some(values[0].to_string())) } } - Err(_) => Ok(None), // Path not found or error + Err(e) => Err(common::execution_error(format!( + "Failed to select values from path '{}': {}", + path, e + ))), + } +} + +/// Create the json_exists UDF for checking if a JSONPath exists +/// +/// # Arguments +/// * First parameter: JSONB binary data (LargeBinary) +/// * Second parameter: JSONPath expression as string (Utf8) +/// +/// # Returns +/// Boolean indicating whether the path exists in the JSON data +pub fn json_exists_udf() -> ScalarUDF { + create_udf( + "json_exists", + vec![DataType::LargeBinary, DataType::Utf8], + DataType::Boolean, + Volatility::Immutable, + Arc::new(json_exists_columnar_impl), + ) +} + +/// Implementation of json_exists function with ColumnarValue +fn json_exists_columnar_impl(args: &[ColumnarValue]) -> Result { + let arrays = common::columnar_to_arrays(args); + let result = json_exists_impl(&arrays)?; + Ok(ColumnarValue::Array(result)) +} + +/// Implementation of json_exists function +fn json_exists_impl(args: &[ArrayRef]) -> Result { + common::validate_arg_count(args, 2, "json_exists")?; + + let jsonb_array = common::extract_jsonb_array(args)?; + let path_array = common::extract_string_array(args, 1)?; + + let mut builder = BooleanBuilder::with_capacity(jsonb_array.len()); + + for i in 0..jsonb_array.len() { + if jsonb_array.is_null(i) { + builder.append_null(); + } else if let Some(path) = common::get_string_value_at(path_array, i) { + let jsonb_bytes = jsonb_array.value(i); + let exists = check_json_path_exists(jsonb_bytes, path)?; + builder.append_value(exists); + } else { + builder.append_null(); + } + } + + Ok(Arc::new(builder.finish())) +} + +/// Check if a JSONPath exists in JSONB +fn check_json_path_exists(jsonb_bytes: &[u8], path: &str) -> Result { + let json_path = common::parse_json_path(path)?; + + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb); + match selector.exists(&json_path) { + Ok(exists) => Ok(exists), + Err(e) => Err(common::execution_error(format!( + "Failed to check existence of path '{}': {}", + path, e + ))), + } +} + +/// Create the json_get UDF for getting a field value as JSON string +/// +/// # Arguments +/// * First parameter: JSONB binary data (LargeBinary) +/// * Second parameter: Field name or array index as string (Utf8) +/// +/// # Returns +/// Raw JSONB bytes of the field value, or null if not found +pub fn json_get_udf() -> ScalarUDF { + create_udf( + "json_get", + vec![DataType::LargeBinary, DataType::Utf8], + DataType::LargeBinary, + Volatility::Immutable, + Arc::new(json_get_columnar_impl), + ) +} + +/// Implementation of json_get function with ColumnarValue +fn json_get_columnar_impl(args: &[ColumnarValue]) -> Result { + let arrays = common::columnar_to_arrays(args); + let result = json_get_impl(&arrays)?; + Ok(ColumnarValue::Array(result)) +} + +/// Implementation of json_get function +fn json_get_impl(args: &[ArrayRef]) -> Result { + common::validate_arg_count(args, 2, "json_get")?; + + let jsonb_array = common::extract_jsonb_array(args)?; + let key_array = common::extract_string_array(args, 1)?; + + let mut builder = LargeBinaryBuilder::with_capacity(jsonb_array.len(), 0); + + for i in 0..jsonb_array.len() { + if jsonb_array.is_null(i) { + builder.append_null(); + } else if let Some(key) = common::get_string_value_at(key_array, i) { + let jsonb_bytes = jsonb_array.value(i); + let key_type = common::KeyType::parse(key); + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + + match common::get_json_value_by_key(&raw_jsonb, &key_type)? { + Some(value) => builder.append_value(value.as_raw().as_ref()), + None => builder.append_null(), + } + } else { + builder.append_null(); + } + } + + Ok(Arc::new(builder.finish())) +} + +/// Create the json_get_string UDF for getting a string value +/// +/// # Arguments +/// * First parameter: JSONB binary data (LargeBinary) +/// * Second parameter: Field name or array index as string (Utf8) +/// +/// # Returns +/// String value with type coercion (numbers/booleans converted to strings) +pub fn json_get_string_udf() -> ScalarUDF { + create_udf( + "json_get_string", + vec![DataType::LargeBinary, DataType::Utf8], + DataType::Utf8, + Volatility::Immutable, + Arc::new(json_get_string_columnar_impl), + ) +} + +/// Implementation of json_get_string function with ColumnarValue +fn json_get_string_columnar_impl(args: &[ColumnarValue]) -> Result { + let arrays = common::columnar_to_arrays(args); + let result = json_get_string_impl(&arrays)?; + Ok(ColumnarValue::Array(result)) +} + +/// Implementation of json_get_string function +fn json_get_string_impl(args: &[ArrayRef]) -> Result { + common::validate_arg_count(args, 2, "json_get_string")?; + + let jsonb_array = common::extract_jsonb_array(args)?; + let key_array = common::extract_string_array(args, 1)?; + + let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024); + + for i in 0..jsonb_array.len() { + if jsonb_array.is_null(i) { + builder.append_null(); + } else if let Some(key) = common::get_string_value_at(key_array, i) { + let jsonb_bytes = jsonb_array.value(i); + let key_type = common::KeyType::parse(key); + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + + match common::get_json_value_by_key(&raw_jsonb, &key_type)? { + Some(value) => match json_value_to_string(value)? { + Some(string_val) => builder.append_value(&string_val), + None => builder.append_null(), + }, + None => builder.append_null(), + } + } else { + builder.append_null(); + } + } + + Ok(Arc::new(builder.finish())) +} + +/// Create the json_get_int UDF for getting an integer value +/// +/// # Arguments +/// * First parameter: JSONB binary data (LargeBinary) +/// * Second parameter: Field name or array index as string (Utf8) +/// +/// # Returns +/// Integer value with type coercion (strings/floats/booleans converted to int) +pub fn json_get_int_udf() -> ScalarUDF { + create_udf( + "json_get_int", + vec![DataType::LargeBinary, DataType::Utf8], + DataType::Int64, + Volatility::Immutable, + Arc::new(json_get_int_columnar_impl), + ) +} + +/// Implementation of json_get_int function with ColumnarValue +fn json_get_int_columnar_impl(args: &[ColumnarValue]) -> Result { + let arrays = common::columnar_to_arrays(args); + let result = json_get_int_impl(&arrays)?; + Ok(ColumnarValue::Array(result)) +} + +/// Implementation of json_get_int function +fn json_get_int_impl(args: &[ArrayRef]) -> Result { + common::validate_arg_count(args, 2, "json_get_int")?; + + let jsonb_array = common::extract_jsonb_array(args)?; + let key_array = common::extract_string_array(args, 1)?; + + let mut builder = Int64Builder::with_capacity(jsonb_array.len()); + + for i in 0..jsonb_array.len() { + if jsonb_array.is_null(i) { + builder.append_null(); + } else if let Some(key) = common::get_string_value_at(key_array, i) { + let jsonb_bytes = jsonb_array.value(i); + let key_type = common::KeyType::parse(key); + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + + match common::get_json_value_by_key(&raw_jsonb, &key_type)? { + Some(value) => match json_value_to_int(value)? { + Some(int_val) => builder.append_value(int_val), + None => builder.append_null(), + }, + None => builder.append_null(), + } + } else { + builder.append_null(); + } + } + + Ok(Arc::new(builder.finish())) +} + +/// Create the json_get_float UDF for getting a float value +/// +/// # Arguments +/// * First parameter: JSONB binary data (LargeBinary) +/// * Second parameter: Field name or array index as string (Utf8) +/// +/// # Returns +/// Float value with type coercion (strings/integers/booleans converted to float) +pub fn json_get_float_udf() -> ScalarUDF { + create_udf( + "json_get_float", + vec![DataType::LargeBinary, DataType::Utf8], + DataType::Float64, + Volatility::Immutable, + Arc::new(json_get_float_columnar_impl), + ) +} + +/// Implementation of json_get_float function with ColumnarValue +fn json_get_float_columnar_impl(args: &[ColumnarValue]) -> Result { + let arrays = common::columnar_to_arrays(args); + let result = json_get_float_impl(&arrays)?; + Ok(ColumnarValue::Array(result)) +} + +/// Implementation of json_get_float function +fn json_get_float_impl(args: &[ArrayRef]) -> Result { + common::validate_arg_count(args, 2, "json_get_float")?; + + let jsonb_array = common::extract_jsonb_array(args)?; + let key_array = common::extract_string_array(args, 1)?; + + let mut builder = Float64Builder::with_capacity(jsonb_array.len()); + + for i in 0..jsonb_array.len() { + if jsonb_array.is_null(i) { + builder.append_null(); + } else if let Some(key) = common::get_string_value_at(key_array, i) { + let jsonb_bytes = jsonb_array.value(i); + let key_type = common::KeyType::parse(key); + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + + match common::get_json_value_by_key(&raw_jsonb, &key_type)? { + Some(value) => match json_value_to_float(value)? { + Some(float_val) => builder.append_value(float_val), + None => builder.append_null(), + }, + None => builder.append_null(), + } + } else { + builder.append_null(); + } + } + + Ok(Arc::new(builder.finish())) +} + +/// Create the json_get_bool UDF for getting a boolean value +/// +/// # Arguments +/// * First parameter: JSONB binary data (LargeBinary) +/// * Second parameter: Field name or array index as string (Utf8) +/// +/// # Returns +/// Boolean value with flexible type coercion (strings like 'true'/'yes'/'1' become true) +pub fn json_get_bool_udf() -> ScalarUDF { + create_udf( + "json_get_bool", + vec![DataType::LargeBinary, DataType::Utf8], + DataType::Boolean, + Volatility::Immutable, + Arc::new(json_get_bool_columnar_impl), + ) +} + +/// Implementation of json_get_bool function with ColumnarValue +fn json_get_bool_columnar_impl(args: &[ColumnarValue]) -> Result { + let arrays = common::columnar_to_arrays(args); + let result = json_get_bool_impl(&arrays)?; + Ok(ColumnarValue::Array(result)) +} + +/// Implementation of json_get_bool function +fn json_get_bool_impl(args: &[ArrayRef]) -> Result { + common::validate_arg_count(args, 2, "json_get_bool")?; + + let jsonb_array = common::extract_jsonb_array(args)?; + let key_array = common::extract_string_array(args, 1)?; + + let mut builder = BooleanBuilder::with_capacity(jsonb_array.len()); + + for i in 0..jsonb_array.len() { + if jsonb_array.is_null(i) { + builder.append_null(); + } else if let Some(key) = common::get_string_value_at(key_array, i) { + let jsonb_bytes = jsonb_array.value(i); + let key_type = common::KeyType::parse(key); + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + + match common::get_json_value_by_key(&raw_jsonb, &key_type)? { + Some(value) => match json_value_to_bool(value)? { + Some(bool_val) => builder.append_value(bool_val), + None => builder.append_null(), + }, + None => builder.append_null(), + } + } else { + builder.append_null(); + } + } + + Ok(Arc::new(builder.finish())) +} + +/// Create the json_array_contains UDF for checking if array contains a value +/// +/// # Arguments +/// * First parameter: JSONB binary data (LargeBinary) +/// * Second parameter: JSONPath to array location (Utf8) +/// * Third parameter: Value to search for as string (Utf8) +/// +/// # Returns +/// Boolean indicating whether the array contains the specified value +pub fn json_array_contains_udf() -> ScalarUDF { + create_udf( + "json_array_contains", + vec![DataType::LargeBinary, DataType::Utf8, DataType::Utf8], + DataType::Boolean, + Volatility::Immutable, + Arc::new(json_array_contains_columnar_impl), + ) +} + +/// Implementation of json_array_contains function with ColumnarValue +fn json_array_contains_columnar_impl(args: &[ColumnarValue]) -> Result { + let arrays = common::columnar_to_arrays(args); + let result = json_array_contains_impl(&arrays)?; + Ok(ColumnarValue::Array(result)) +} + +/// Implementation of json_array_contains function +fn json_array_contains_impl(args: &[ArrayRef]) -> Result { + common::validate_arg_count(args, 3, "json_array_contains")?; + + let jsonb_array = common::extract_jsonb_array(args)?; + let path_array = common::extract_string_array(args, 1)?; + let value_array = common::extract_string_array(args, 2)?; + + let mut builder = BooleanBuilder::with_capacity(jsonb_array.len()); + + for i in 0..jsonb_array.len() { + if jsonb_array.is_null(i) { + builder.append_null(); + } else { + let path = common::get_string_value_at(path_array, i); + let value = common::get_string_value_at(value_array, i); + + match (path, value) { + (Some(p), Some(v)) => { + let jsonb_bytes = jsonb_array.value(i); + let contains = check_array_contains(jsonb_bytes, p, v)?; + builder.append_value(contains); + } + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +/// Check if a JSON array at path contains a value +fn check_array_contains(jsonb_bytes: &[u8], path: &str, value: &str) -> Result { + let json_path = common::parse_json_path(path)?; + + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb); + match selector.select_values(&json_path) { + Ok(values) => { + for v in values { + // Convert to raw JSONB for direct access + let raw = v.as_raw(); + // Check if it's an array by trying to iterate + let mut index = 0; + loop { + match raw.get_by_index(index) { + Ok(Some(elem)) => { + let elem_str = elem.to_string(); + // Compare as JSON strings (with quotes for strings) + if elem_str == value || elem_str == format!("\"{}\"", value) { + return Ok(true); + } + index += 1; + } + Ok(None) => break, // End of array + Err(_) => break, // Not an array or error + } + } + } + Ok(false) + } + Err(e) => Err(common::execution_error(format!( + "Failed to check array contains at path '{}': {}", + path, e + ))), + } +} + +/// Create the json_array_length UDF for getting array length +/// +/// # Arguments +/// * First parameter: JSONB binary data (LargeBinary) +/// * Second parameter: JSONPath to array location (Utf8) +/// +/// # Returns +/// Integer length of the JSON array, or null if path doesn't point to an array +pub fn json_array_length_udf() -> ScalarUDF { + create_udf( + "json_array_length", + vec![DataType::LargeBinary, DataType::Utf8], + DataType::Int64, + Volatility::Immutable, + Arc::new(json_array_length_columnar_impl), + ) +} + +/// Implementation of json_array_length function with ColumnarValue +fn json_array_length_columnar_impl(args: &[ColumnarValue]) -> Result { + let arrays = common::columnar_to_arrays(args); + let result = json_array_length_impl(&arrays)?; + Ok(ColumnarValue::Array(result)) +} + +/// Implementation of json_array_length function +fn json_array_length_impl(args: &[ArrayRef]) -> Result { + common::validate_arg_count(args, 2, "json_array_length")?; + + let jsonb_array = common::extract_jsonb_array(args)?; + let path_array = common::extract_string_array(args, 1)?; + + let mut builder = Int64Builder::with_capacity(jsonb_array.len()); + + for i in 0..jsonb_array.len() { + if jsonb_array.is_null(i) { + builder.append_null(); + } else if let Some(path) = common::get_string_value_at(path_array, i) { + let jsonb_bytes = jsonb_array.value(i); + match get_array_length(jsonb_bytes, path)? { + Some(len) => builder.append_value(len), + None => builder.append_null(), + } + } else { + builder.append_null(); + } + } + + Ok(Arc::new(builder.finish())) +} + +/// Get the length of a JSON array at path +fn get_array_length(jsonb_bytes: &[u8], path: &str) -> Result> { + let json_path = common::parse_json_path(path)?; + + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb); + match selector.select_values(&json_path) { + Ok(values) => { + if values.is_empty() { + return Ok(None); + } + let first = &values[0]; + let raw = first.as_raw(); + + // Count array elements by iterating + let mut count = 0; + loop { + match raw.get_by_index(count) { + Ok(Some(_)) => count += 1, + Ok(None) => break, // End of array + Err(_) => { + // Not an array + if count == 0 { + return Err(common::execution_error(format!( + "Path '{}' does not point to an array", + path + ))); + } + break; + } + } + } + Ok(Some(count as i64)) + } + Err(e) => Err(common::execution_error(format!( + "Failed to get array length at path '{}': {}", + path, e + ))), } } @@ -110,14 +806,17 @@ fn extract_json_path(jsonb_bytes: &[u8], path: &str) -> Result> { mod tests { use super::*; use arrow_array::builder::LargeBinaryBuilder; + use arrow_array::{BooleanArray, Int64Array}; + + fn create_test_jsonb(json_str: &str) -> Vec { + jsonb::parse_value(json_str.as_bytes()).unwrap().to_vec() + } #[tokio::test] - async fn test_variant_extract_udf() -> Result<()> { - // Create test JSONB data + async fn test_json_extract_udf() -> Result<()> { let json = r#"{"user": {"name": "Alice", "age": 30}}"#; - let jsonb_bytes = jsonb::parse_value(json.as_bytes()).unwrap().to_vec(); + let jsonb_bytes = create_test_jsonb(json); - // Create arrays let mut binary_builder = LargeBinaryBuilder::new(); binary_builder.append_value(&jsonb_bytes); binary_builder.append_value(&jsonb_bytes); @@ -130,7 +829,6 @@ mod tests { Some("$.user.name"), ])); - // Call UDF let result = json_extract_impl(&[jsonb_array, path_array])?; let string_array = result.as_any().downcast_ref::().unwrap(); @@ -141,4 +839,223 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_json_exists_udf() -> Result<()> { + let json = r#"{"user": {"name": "Alice", "age": 30}, "tags": ["rust", "json"]}"#; + let jsonb_bytes = create_test_jsonb(json); + + let mut binary_builder = LargeBinaryBuilder::new(); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_null(); + + let jsonb_array = Arc::new(binary_builder.finish()); + let path_array = Arc::new(StringArray::from(vec![ + Some("$.user.name"), + Some("$.user.email"), + Some("$.tags"), + Some("$.any"), + ])); + + let result = json_exists_impl(&[jsonb_array, path_array])?; + let bool_array = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(bool_array.len(), 4); + assert!(bool_array.value(0)); + assert!(!bool_array.value(1)); + assert!(bool_array.value(2)); + assert!(bool_array.is_null(3)); + + Ok(()) + } + + #[tokio::test] + async fn test_json_get_string_udf() -> Result<()> { + // Test valid string conversions + let json = r#"{"str": "hello", "num": 123, "bool": true, "null": null}"#; + let jsonb_bytes = create_test_jsonb(json); + + let mut binary_builder = LargeBinaryBuilder::new(); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + + let jsonb_array = Arc::new(binary_builder.finish()); + let key_array = Arc::new(StringArray::from(vec![ + Some("str"), + Some("num"), + Some("bool"), + Some("null"), + ])); + + let result = json_get_string_impl(&[jsonb_array, key_array])?; + let string_array = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(string_array.len(), 4); + assert_eq!(string_array.value(0), "hello"); + assert_eq!(string_array.value(1), "123"); + assert_eq!(string_array.value(2), "true"); + assert!(string_array.is_null(3)); + + Ok(()) + } + + #[tokio::test] + async fn test_json_get_int_udf() -> Result<()> { + let json = r#"{"int": 42, "str_num": "99", "bool": true}"#; + let jsonb_bytes = create_test_jsonb(json); + + let mut binary_builder = LargeBinaryBuilder::new(); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + + let jsonb_array = Arc::new(binary_builder.finish()); + let key_array = Arc::new(StringArray::from(vec![ + Some("int"), + Some("str_num"), + Some("bool"), + ])); + + let result = json_get_int_impl(&[jsonb_array, key_array])?; + let int_array = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(int_array.len(), 3); + assert_eq!(int_array.value(0), 42); + assert_eq!(int_array.value(1), 99); + assert_eq!(int_array.value(2), 1); // jsonb converts true to 1 + + Ok(()) + } + + #[tokio::test] + async fn test_json_get_bool_udf() -> Result<()> { + let json = + r#"{"bool_true": true, "bool_false": false, "str_true": "true", "str_false": "false"}"#; + let jsonb_bytes = create_test_jsonb(json); + + let mut binary_builder = LargeBinaryBuilder::new(); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + + let jsonb_array = Arc::new(binary_builder.finish()); + let key_array = Arc::new(StringArray::from(vec![ + Some("bool_true"), + Some("bool_false"), + Some("str_true"), + Some("str_false"), + ])); + + let result = json_get_bool_impl(&[jsonb_array, key_array])?; + let bool_array = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(bool_array.len(), 4); + assert!(bool_array.value(0)); + assert!(!bool_array.value(1)); + assert!(bool_array.value(2)); // "true" string converts to true + assert!(!bool_array.value(3)); // "false" string converts to false + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_contains_udf() -> Result<()> { + let json = r#"{"tags": ["rust", "json", "database"], "nums": [1, 2, 3]}"#; + let jsonb_bytes = create_test_jsonb(json); + + let mut binary_builder = LargeBinaryBuilder::new(); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_null(); + + let jsonb_array = Arc::new(binary_builder.finish()); + let path_array = Arc::new(StringArray::from(vec![ + Some("$.tags"), + Some("$.tags"), + Some("$.nums"), + Some("$.tags"), + ])); + let value_array = Arc::new(StringArray::from(vec![ + Some("rust"), + Some("python"), + Some("2"), + Some("any"), + ])); + + let result = json_array_contains_impl(&[jsonb_array, path_array, value_array])?; + let bool_array = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(bool_array.len(), 4); + assert!(bool_array.value(0)); + assert!(!bool_array.value(1)); + assert!(bool_array.value(2)); + assert!(bool_array.is_null(3)); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_length_udf() -> Result<()> { + let json = r#"{"empty": [], "tags": ["a", "b", "c"], "nested": {"arr": [1, 2]}}"#; + let jsonb_bytes = create_test_jsonb(json); + + let mut binary_builder = LargeBinaryBuilder::new(); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_null(); + + let jsonb_array = Arc::new(binary_builder.finish()); + let path_array = Arc::new(StringArray::from(vec![ + Some("$.empty"), + Some("$.tags"), + Some("$.nested.arr"), + Some("$.any"), + ])); + + let result = json_array_length_impl(&[jsonb_array, path_array])?; + let int_array = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(int_array.len(), 4); + assert_eq!(int_array.value(0), 0); + assert_eq!(int_array.value(1), 3); + assert_eq!(int_array.value(2), 2); + assert!(int_array.is_null(3)); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_access() -> Result<()> { + let json = r#"["first", "second", "third"]"#; + let jsonb_bytes = create_test_jsonb(json); + + let mut binary_builder = LargeBinaryBuilder::new(); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + + let jsonb_array = Arc::new(binary_builder.finish()); + let key_array = Arc::new(StringArray::from(vec![ + Some("0"), + Some("1"), + Some("10"), // Out of bounds + ])); + + let result = json_get_string_impl(&[jsonb_array, key_array])?; + let string_array = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(string_array.len(), 3); + assert_eq!(string_array.value(0), "first"); + assert_eq!(string_array.value(1), "second"); + assert!(string_array.is_null(2)); + + Ok(()) + } } diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index f06acffa73e..1c9ee6d721c 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -71,6 +71,7 @@ use tracing::{info_span, instrument, Span}; use super::Dataset; use crate::dataset::row_offsets_to_row_addresses; +use crate::dataset::utils::wrap_json_stream_for_reading; use crate::index::scalar::detect_scalar_index_type; use crate::index::vector::utils::{get_vector_dim, get_vector_type}; use crate::index::DatasetIndexInternalExt; @@ -85,6 +86,7 @@ use crate::io::exec::{ }; use crate::{datatypes::Schema, io::exec::fts::BooleanQueryExec}; use crate::{Error, Result}; + use snafu::location; pub use lance_datafusion::exec::{ExecutionStatsCallback, ExecutionSummaryCounts}; @@ -3460,6 +3462,11 @@ pub struct DatasetRecordBatchStream { impl DatasetRecordBatchStream { pub fn new(exec_node: SendableRecordBatchStream) -> Self { + // Convert lance.json (JSONB) back to arrow.json (strings) for reading + // + // This is so bad, we need to find a way to remove this. + let exec_node = wrap_json_stream_for_reading(exec_node); + let span = info_span!("DatasetRecordBatchStream"); Self { exec_node, span } } diff --git a/rust/lance/src/dataset/utils.rs b/rust/lance/src/dataset/utils.rs index 4595d439683..c528f6d8c2f 100644 --- a/rust/lance/src/dataset/utils.rs +++ b/rust/lance/src/dataset/utils.rs @@ -1,20 +1,24 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use crate::Result; use arrow_array::{RecordBatch, UInt64Array}; +use arrow_schema::Schema as ArrowSchema; use datafusion::error::Result as DFResult; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::SendableRecordBatchStream; use futures::StreamExt; +use lance_arrow::json::{ + arrow_json_to_lance_json, convert_json_columns, convert_lance_json_to_arrow, + is_arrow_json_field, is_json_field, +}; +use lance_core::ROW_ID; +use lance_table::rowids::{RowIdIndex, RowIdSequence}; use roaring::RoaringTreemap; use std::borrow::Cow; use std::sync::mpsc::Receiver; use std::sync::Arc; -use crate::Result; -use lance_core::ROW_ID; -use lance_table::rowids::{RowIdIndex, RowIdSequence}; - fn extract_row_ids( row_ids: &mut CapturedRowIds, batch: RecordBatch, @@ -133,3 +137,110 @@ impl Default for CapturedRowIds { Self::AddressStyle(RoaringTreemap::new()) } } + +/// Wrap a stream to convert arrow.json to lance.json for writing +/// +// FIXME: this is bad, really bad, we need to find a way to remove this. +pub fn wrap_json_stream_for_writing( + stream: SendableRecordBatchStream, +) -> SendableRecordBatchStream { + // Check if any fields need conversion + let needs_conversion = stream + .schema() + .fields() + .iter() + .any(|f| is_arrow_json_field(f)); + + if !needs_conversion { + return stream; + } + + // Convert the schema + let arrow_schema = stream.schema(); + let mut new_fields = Vec::with_capacity(arrow_schema.fields().len()); + for field in arrow_schema.fields() { + if is_arrow_json_field(field) { + new_fields.push(Arc::new(arrow_json_to_lance_json(field))); + } else { + new_fields.push(Arc::clone(field)); + } + } + let converted_schema = Arc::new(ArrowSchema::new_with_metadata( + new_fields, + arrow_schema.metadata().clone(), + )); + + // Convert the stream + let converted_stream = stream.map(move |batch_result| { + batch_result.and_then(|batch| { + convert_json_columns(&batch) + .map_err(|e| datafusion::error::DataFusionError::ArrowError(e, None)) + }) + }); + + Box::pin(RecordBatchStreamAdapter::new( + converted_schema, + converted_stream, + )) +} + +/// Wrap a stream to convert lance.json (JSONB) back to arrow.json (strings) for reading +/// +// FIXME: this is bad, really bad, we need to find a way to remove this. +pub fn wrap_json_stream_for_reading( + stream: SendableRecordBatchStream, +) -> SendableRecordBatchStream { + use lance_arrow::json::ARROW_JSON_EXT_NAME; + use lance_arrow::ARROW_EXT_NAME_KEY; + + // Check if any fields need conversion + let needs_conversion = stream.schema().fields().iter().any(|f| is_json_field(f)); + + if !needs_conversion { + return stream; + } + + // Convert the schema + let arrow_schema = stream.schema(); + let mut new_fields = Vec::with_capacity(arrow_schema.fields().len()); + for field in arrow_schema.fields() { + if is_json_field(field) { + // Convert lance.json (LargeBinary) to arrow.json (Utf8) + let mut new_field = arrow_schema::Field::new( + field.name(), + arrow_schema::DataType::Utf8, + field.is_nullable(), + ); + let mut metadata = field.metadata().clone(); + metadata.insert( + ARROW_EXT_NAME_KEY.to_string(), + ARROW_JSON_EXT_NAME.to_string(), + ); + new_field.set_metadata(metadata); + new_fields.push(new_field); + } else { + new_fields.push(field.as_ref().clone()); + } + } + let converted_schema = Arc::new(ArrowSchema::new_with_metadata( + new_fields, + arrow_schema.metadata().clone(), + )); + + // Convert the stream + let converted_stream = stream.map(move |batch_result| { + batch_result.and_then(|batch| { + convert_lance_json_to_arrow(&batch).map_err(|e| { + datafusion::error::DataFusionError::ArrowError( + arrow_schema::ArrowError::InvalidArgumentError(e.to_string()), + None, + ) + }) + }) + }); + + Box::pin(RecordBatchStreamAdapter::new( + converted_schema, + converted_stream, + )) +} diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 22a6ceeb9a7..f09b2485629 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -38,8 +38,11 @@ use crate::Dataset; use super::blob::BlobStreamExt; use super::progress::{NoopFragmentWriteProgress, WriteFragmentProgress}; use super::transaction::Transaction; +use super::utils::wrap_json_stream_for_writing; use super::DATA_DIR; +use lance_arrow::json::is_arrow_json_field; + mod commit; pub mod delete; mod insert; @@ -305,6 +308,11 @@ pub async fn do_write_fragments( params: WriteParams, storage_version: LanceFileVersion, ) -> Result> { + // Convert arrow.json to lance.json (JSONB) for storage if needed + // + // FIXME: this is bad, really bad, we need to find a way to remove this. + let data = wrap_json_stream_for_writing(data); + let mut buffered_reader = if storage_version == LanceFileVersion::Legacy { // In v1 we split the stream into row group sized batches chunk_stream(data, params.max_rows_per_group) @@ -386,6 +394,26 @@ pub async fn write_fragments_internal( data: SendableRecordBatchStream, mut params: WriteParams, ) -> Result { + // Convert Arrow JSON columns to Lance JSON (JSONB) format + // + // FIXME: this is bad, really bad, we need to find a way to remove this. + let needs_conversion = data + .schema() + .fields() + .iter() + .any(|f| is_arrow_json_field(f)); + + let (data, converted_schema) = if needs_conversion { + let data = wrap_json_stream_for_writing(data); + // Update the schema to match the converted data + let arrow_schema = data.schema(); + let converted_schema = Schema::try_from(arrow_schema.as_ref())?; + (data, converted_schema) + } else { + // No conversion needed, use original schema to preserve dictionary info + (data, schema) + }; + // Make sure the max rows per group is not larger than the max rows per file params.max_rows_per_group = std::cmp::min(params.max_rows_per_group, params.max_rows_per_file); @@ -393,7 +421,7 @@ pub async fn write_fragments_internal( match params.mode { WriteMode::Append | WriteMode::Create => { // Append mode, so we need to check compatibility - schema.check_compatible( + converted_schema.check_compatible( dataset.schema(), &SchemaCompareOptions { // We don't care if the user claims their data is nullable / non-nullable. We will @@ -407,7 +435,7 @@ pub async fn write_fragments_internal( )?; // Project from the dataset schema, because it has the correct field ids. let write_schema = dataset.schema().project_by_schema( - &schema, + &converted_schema, OnMissing::Error, OnTypeMismatch::Error, )?; @@ -428,13 +456,13 @@ pub async fn write_fragments_internal( .data_storage_format .lance_file_version()?, ); - (schema, data_storage_version) + (converted_schema, data_storage_version) } } } else { // Brand new dataset, use the schema from the data and the storage version // from the user or the default. - (schema, params.storage_version_or_default()) + (converted_schema, params.storage_version_or_default()) }; let data_schema = schema.project_by_schema(