From 2d79f96692000d62479ec6bfe58b7e78a2c61a73 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 27 Aug 2025 17:02:20 +0800 Subject: [PATCH 01/11] feat: add UDFs for json Signed-off-by: Xuanwo --- python/python/tests/test_json.py | 153 +++ rust/lance-arrow/src/json.rs | 3 +- rust/lance-datafusion/src/udf.rs | 9 + rust/lance-datafusion/src/udf/json.rs | 1336 ++++++++++++++++++++++++- 4 files changed, 1493 insertions(+), 8 deletions(-) diff --git a/python/python/tests/test_json.py b/python/python/tests/test_json.py index bf592bccc60..ef9d60b8c09 100644 --- a/python/python/tests/test_json.py +++ b/python/python/tests/test_json.py @@ -184,3 +184,156 @@ def test_json_batch_operations(): for batch in batches: assert batch.num_rows == batch_size + + +def test_json_path_queries(): + """Test JSON path queries using json_extract and json_exists.""" + # Create test data with JSON columns + json_data = [ + {"user": {"name": "Alice", "age": 30}, "tags": ["python", "ml"]}, + {"user": {"name": "Bob", "age": 25}, "tags": ["rust", "db"]}, + {"user": {"name": "Charlie"}, "tags": []}, + None, + ] + + json_strings = [json.dumps(d) if d is not None else None for d in json_data] + json_arr = pa.array(json_strings, type=pa.json_()) + + # Create a Lance dataset with JSON data + table = pa.table({ + "id": [1, 2, 3, 4], + "data": json_arr, + }) + + with tempfile.TemporaryDirectory() as tmpdir: + ds_path = Path(tmpdir) / "json_test.lance" + lance.write_dataset(table, ds_path, data_storage_version="2.2") + dataset = lance.dataset(ds_path) + + # Test json_extract + result = dataset.to_table(filter="json_extract(data, '$.user.name') = '\"Alice\"'") + assert result.num_rows == 1 + assert result["id"][0].as_py() == 1 + + # Test json_exists + result = dataset.to_table(filter="json_exists(data, '$.user.age')") + assert result.num_rows == 2 # Alice and Bob have age field + + # Test json_array_contains + result = dataset.to_table(filter="json_array_contains(data, '$.tags', 'python')") + assert result.num_rows == 1 + assert result["id"][0].as_py() == 1 + + +def test_json_get_functions(): + """Test json_get_* functions for type-safe value extraction.""" + # Create test data with various JSON types + json_data = [ + {"name": "Alice", "age": 30, "active": True, "score": 95.5}, + {"name": "Bob", "age": 25, "active": False, "score": 87.3}, + {"name": "Charlie", "age": "35", "active": "true", "score": "92"}, + {"name": "David"}, # Missing fields + ] + + json_strings = [json.dumps(d) for d in json_data] + json_arr = pa.array(json_strings, type=pa.json_()) + + table = pa.table({ + "id": [1, 2, 3, 4], + "data": json_arr, + }) + + with tempfile.TemporaryDirectory() as tmpdir: + ds_path = Path(tmpdir) / "json_get_test.lance" + lance.write_dataset(table, ds_path, data_storage_version="2.2") + dataset = lance.dataset(ds_path) + + # Test json_get_string + result = dataset.to_table(filter="json_get_string(data, 'name') = 'Alice'") + assert result.num_rows == 1 + assert result["id"][0].as_py() == 1 + + # Test json_get_int with type coercion + result = dataset.to_table(filter="json_get_int(data, 'age') > 28") + assert result.num_rows == 2 # Alice (30) and Charlie ("35" -> 35) + + # Test json_get_bool with type coercion + result = dataset.to_table(filter="json_get_bool(data, 'active') = true") + assert result.num_rows == 2 # Alice (true) and Charlie ("true" -> true) + + # Test json_get_float + result = dataset.to_table(filter="json_get_float(data, 'score') > 90") + assert result.num_rows == 2 # Alice (95.5) and Charlie ("92" -> 92.0) + + +def test_nested_json_access(): + """Test accessing nested JSON structures.""" + json_data = [ + {"user": {"profile": {"name": "Alice", "settings": {"theme": "dark"}}}}, + {"user": {"profile": {"name": "Bob", "settings": {"theme": "light"}}}}, + ] + + json_strings = [json.dumps(d) for d in json_data] + json_arr = pa.array(json_strings, type=pa.json_()) + + table = pa.table({ + "id": [1, 2], + "data": json_arr, + }) + + with tempfile.TemporaryDirectory() as tmpdir: + ds_path = Path(tmpdir) / "nested_json_test.lance" + lance.write_dataset(table, ds_path, data_storage_version="2.2") + dataset = lance.dataset(ds_path) + + # Access nested fields using json_get recursively + # First get user, then profile, then name + result = dataset.to_table( + filter="json_get_string(json_get(json_get(data, 'user'), 'profile'), 'name') = 'Alice'" + ) + assert result.num_rows == 1 + assert result["id"][0].as_py() == 1 + + # Or use JSONPath for deep access + result = dataset.to_table( + filter="json_extract(data, '$.user.profile.settings.theme') = '\"dark\"'" + ) + assert result.num_rows == 1 + assert result["id"][0].as_py() == 1 + + +def test_json_array_operations(): + """Test JSON array operations.""" + json_data = [ + {"items": ["apple", "banana", "orange"], "counts": [1, 2, 3, 4, 5]}, + {"items": ["grape", "melon"], "counts": [10, 20]}, + {"items": [], "counts": []}, + ] + + json_strings = [json.dumps(d) for d in json_data] + json_arr = pa.array(json_strings, type=pa.json_()) + + table = pa.table({ + "id": [1, 2, 3], + "data": json_arr, + }) + + with tempfile.TemporaryDirectory() as tmpdir: + ds_path = Path(tmpdir) / "array_json_test.lance" + lance.write_dataset(table, ds_path, data_storage_version="2.2") + dataset = lance.dataset(ds_path) + + # Test array contains + result = dataset.to_table(filter="json_array_contains(data, '$.items', 'apple')") + assert result.num_rows == 1 + assert result["id"][0].as_py() == 1 + + # Test array length + result = dataset.to_table(filter="json_array_length(data, '$.counts') > 3") + assert result.num_rows == 1 + assert result["id"][0].as_py() == 1 + + # Test empty array + result = dataset.to_table(filter="json_array_length(data, '$.items') = 0") + assert result.num_rows == 1 + assert result["id"][0].as_py() == 3 diff --git a/rust/lance-arrow/src/json.rs b/rust/lance-arrow/src/json.rs index 244bb446504..226d48a0275 100644 --- a/rust/lance-arrow/src/json.rs +++ b/rust/lance-arrow/src/json.rs @@ -309,8 +309,9 @@ fn get_json_path( ) -> Result, Box> { let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes())?; let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb); - match raw_jsonb.select_by_path(&json_path) { + match selector.select_values(&json_path) { Ok(values) => { if values.is_empty() { Ok(None) diff --git a/rust/lance-datafusion/src/udf.rs b/rust/lance-datafusion/src/udf.rs index 994d32cbe67..f19e59de487 100644 --- a/rust/lance-datafusion/src/udf.rs +++ b/rust/lance-datafusion/src/udf.rs @@ -15,7 +15,16 @@ mod json; /// Register UDF functions to datafusion context. pub fn register_functions(ctx: &SessionContext) { ctx.register_udf(CONTAINS_TOKENS_UDF.clone()); + // JSON functions ctx.register_udf(json::json_extract_udf()); + ctx.register_udf(json::json_exists_udf()); + ctx.register_udf(json::json_get_udf()); + ctx.register_udf(json::json_get_string_udf()); + ctx.register_udf(json::json_get_int_udf()); + ctx.register_udf(json::json_get_float_udf()); + ctx.register_udf(json::json_get_bool_udf()); + ctx.register_udf(json::json_array_contains_udf()); + ctx.register_udf(json::json_array_length_udf()); } /// This method checks whether a string contains another string. It utilizes FTS (Full-Text Search) diff --git a/rust/lance-datafusion/src/udf/json.rs b/rust/lance-datafusion/src/udf/json.rs index 1cfe9611af6..bdea6271789 100644 --- a/rust/lance-datafusion/src/udf/json.rs +++ b/rust/lance-datafusion/src/udf/json.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use arrow_array::builder::StringBuilder; +use arrow_array::builder::{BooleanBuilder, Float64Builder, Int64Builder, StringBuilder}; use arrow_array::{Array, ArrayRef, LargeBinaryArray, StringArray}; use arrow_schema::DataType; use datafusion::error::Result; @@ -93,7 +93,8 @@ fn extract_json_path(jsonb_bytes: &[u8], path: &str) -> Result> { })?; let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); - match raw_jsonb.select_by_path(&json_path) { + let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb); + match selector.select_values(&json_path) { Ok(values) => { if values.is_empty() { Ok(None) @@ -106,18 +107,1037 @@ fn extract_json_path(jsonb_bytes: &[u8], path: &str) -> Result> { } } +/// Create the json_exists UDF for checking if a JSONPath exists +pub fn json_exists_udf() -> ScalarUDF { + create_udf( + "json_exists", + vec![DataType::LargeBinary, DataType::Utf8], + DataType::Boolean, + Volatility::Immutable, + Arc::new(json_exists_columnar_impl), + ) +} + +/// Implementation of json_exists function with ColumnarValue +fn json_exists_columnar_impl(args: &[ColumnarValue]) -> Result { + let arrays: Vec = args + .iter() + .map(|arg| match arg { + ColumnarValue::Array(arr) => arr.clone(), + ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), + }) + .collect(); + + let result = json_exists_impl(&arrays)?; + Ok(ColumnarValue::Array(result)) +} + +/// Implementation of json_exists function +fn json_exists_impl(args: &[ArrayRef]) -> Result { + if args.len() != 2 { + return Err(datafusion::error::DataFusionError::Execution( + "json_exists requires exactly 2 arguments".to_string(), + )); + } + + let jsonb_array = args[0] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Execution( + "First argument must be LargeBinary".to_string(), + ) + })?; + + let path_array = args[1] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Execution( + "Second argument must be String".to_string(), + ) + })?; + + let mut builder = BooleanBuilder::new(); + + for i in 0..jsonb_array.len() { + if jsonb_array.is_null(i) || path_array.is_null(i) { + builder.append_null(); + } else { + let jsonb_bytes = jsonb_array.value(i); + let path = path_array.value(i); + + match check_json_path_exists(jsonb_bytes, path) { + Ok(exists) => builder.append_value(exists), + Err(e) => { + return Err(datafusion::error::DataFusionError::Execution(format!( + "Failed to check JSONPath existence: {}", + e + ))); + } + } + } + } + + Ok(Arc::new(builder.finish())) +} + +/// Check if a JSONPath exists in JSONB +fn check_json_path_exists(jsonb_bytes: &[u8], path: &str) -> Result { + let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes()).map_err(|e| { + datafusion::error::DataFusionError::Execution(format!("Invalid JSONPath: {}", e)) + })?; + + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb); + match selector.exists(&json_path) { + Ok(exists) => Ok(exists), + Err(_) => Ok(false), + } +} + +/// Create the json_get UDF for getting a field value as JSON string +pub fn json_get_udf() -> ScalarUDF { + create_udf( + "json_get", + vec![DataType::LargeBinary, DataType::Utf8], + DataType::LargeBinary, + Volatility::Immutable, + Arc::new(json_get_columnar_impl), + ) +} + +/// Implementation of json_get function with ColumnarValue +fn json_get_columnar_impl(args: &[ColumnarValue]) -> Result { + let arrays: Vec = args + .iter() + .map(|arg| match arg { + ColumnarValue::Array(arr) => arr.clone(), + ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), + }) + .collect(); + + let result = json_get_impl(&arrays)?; + Ok(ColumnarValue::Array(result)) +} + +/// Implementation of json_get function +fn json_get_impl(args: &[ArrayRef]) -> Result { + if args.len() != 2 { + return Err(datafusion::error::DataFusionError::Execution( + "json_get requires exactly 2 arguments".to_string(), + )); + } + + let jsonb_array = args[0] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Execution( + "First argument must be LargeBinary".to_string(), + ) + })?; + + let key_array = args[1] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Execution( + "Second argument must be String".to_string(), + ) + })?; + + let mut builder = arrow_array::builder::LargeBinaryBuilder::new(); + + for i in 0..jsonb_array.len() { + if jsonb_array.is_null(i) || key_array.is_null(i) { + builder.append_null(); + } else { + let jsonb_bytes = jsonb_array.value(i); + let key = key_array.value(i); + + match get_json_field(jsonb_bytes, key) { + Ok(Some(value)) => builder.append_value(value), + Ok(None) => builder.append_null(), + Err(e) => { + return Err(datafusion::error::DataFusionError::Execution(format!( + "Failed to get JSON field: {}", + e + ))); + } + } + } + } + + Ok(Arc::new(builder.finish())) +} + +/// Get a field value from JSONB (returns JSONB bytes) +fn get_json_field(jsonb_bytes: &[u8], key: &str) -> Result>> { + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + + // Try as object field first + match raw_jsonb.get_by_name(key, false) { + Ok(Some(value)) => return Ok(Some(value.as_raw().as_ref().to_vec())), + Ok(None) => {} + Err(e) => { + return Err(datafusion::error::DataFusionError::Execution(format!( + "Failed to get field: {}", + e + ))) + } + } + + // Try as array index + if let Ok(index) = key.parse::() { + match raw_jsonb.get_by_index(index) { + Ok(Some(value)) => return Ok(Some(value.as_raw().as_ref().to_vec())), + Ok(None) => {} + Err(e) => { + return Err(datafusion::error::DataFusionError::Execution(format!( + "Failed to get array element: {}", + e + ))) + } + } + } + + Ok(None) +} + +/// Create the json_get_string UDF for getting a string value +pub fn json_get_string_udf() -> ScalarUDF { + create_udf( + "json_get_string", + vec![DataType::LargeBinary, DataType::Utf8], + DataType::Utf8, + Volatility::Immutable, + Arc::new(json_get_string_columnar_impl), + ) +} + +/// Implementation of json_get_string function with ColumnarValue +fn json_get_string_columnar_impl(args: &[ColumnarValue]) -> Result { + let arrays: Vec = args + .iter() + .map(|arg| match arg { + ColumnarValue::Array(arr) => arr.clone(), + ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), + }) + .collect(); + + let result = json_get_string_impl(&arrays)?; + Ok(ColumnarValue::Array(result)) +} + +/// Implementation of json_get_string function +fn json_get_string_impl(args: &[ArrayRef]) -> Result { + if args.len() != 2 { + return Err(datafusion::error::DataFusionError::Execution( + "json_get_string requires exactly 2 arguments".to_string(), + )); + } + + let jsonb_array = args[0] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Execution( + "First argument must be LargeBinary".to_string(), + ) + })?; + + let key_array = args[1] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Execution( + "Second argument must be String".to_string(), + ) + })?; + + let mut builder = StringBuilder::new(); + + for i in 0..jsonb_array.len() { + if jsonb_array.is_null(i) || key_array.is_null(i) { + builder.append_null(); + } else { + let jsonb_bytes = jsonb_array.value(i); + let key = key_array.value(i); + + match get_json_field_as_string(jsonb_bytes, key) { + Ok(Some(value)) => builder.append_value(&value), + Ok(None) => builder.append_null(), + Err(e) => { + return Err(datafusion::error::DataFusionError::Execution(format!( + "Failed to get JSON string: {}", + e + ))); + } + } + } + } + + Ok(Arc::new(builder.finish())) +} + +/// Get a field value as string with type coercion +fn get_json_field_as_string(jsonb_bytes: &[u8], key: &str) -> Result> { + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + + // Try as object field first + let value = match raw_jsonb.get_by_name(key, false) { + Ok(Some(value)) => value, + Ok(None) => { + // Try as array index + if let Ok(index) = key.parse::() { + match raw_jsonb.get_by_index(index) { + Ok(Some(value)) => value, + Ok(None) => return Ok(None), + Err(e) => { + return Err(datafusion::error::DataFusionError::Execution(format!( + "Failed to get array element: {}", + e + ))) + } + } + } else { + return Ok(None); + } + } + Err(e) => { + return Err(datafusion::error::DataFusionError::Execution(format!( + "Failed to get field: {}", + e + ))) + } + }; + + // Convert to string and inspect + let json_str = value.to_string(); + + // Check for null + if json_str == "null" { + return Ok(None); + } + + // Check if it's a string (starts and ends with quotes) + if json_str.starts_with('"') && json_str.ends_with('"') { + // Remove quotes + Ok(Some(json_str[1..json_str.len() - 1].to_string())) + } else if json_str == "true" || json_str == "false" { + // Boolean + Ok(Some(json_str)) + } else if json_str.starts_with('[') || json_str.starts_with('{') { + // Array or object - cannot convert to string + Err(datafusion::error::DataFusionError::Execution( + "Cannot convert JSON object or array to string".to_string(), + )) + } else { + // Number or other value + Ok(Some(json_str)) + } +} + +/// Create the json_get_int UDF for getting an integer value +pub fn json_get_int_udf() -> ScalarUDF { + create_udf( + "json_get_int", + vec![DataType::LargeBinary, DataType::Utf8], + DataType::Int64, + Volatility::Immutable, + Arc::new(json_get_int_columnar_impl), + ) +} + +/// Implementation of json_get_int function with ColumnarValue +fn json_get_int_columnar_impl(args: &[ColumnarValue]) -> Result { + let arrays: Vec = args + .iter() + .map(|arg| match arg { + ColumnarValue::Array(arr) => arr.clone(), + ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), + }) + .collect(); + + let result = json_get_int_impl(&arrays)?; + Ok(ColumnarValue::Array(result)) +} + +/// Implementation of json_get_int function +fn json_get_int_impl(args: &[ArrayRef]) -> Result { + if args.len() != 2 { + return Err(datafusion::error::DataFusionError::Execution( + "json_get_int requires exactly 2 arguments".to_string(), + )); + } + + let jsonb_array = args[0] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Execution( + "First argument must be LargeBinary".to_string(), + ) + })?; + + let key_array = args[1] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Execution( + "Second argument must be String".to_string(), + ) + })?; + + let mut builder = Int64Builder::new(); + + for i in 0..jsonb_array.len() { + if jsonb_array.is_null(i) || key_array.is_null(i) { + builder.append_null(); + } else { + let jsonb_bytes = jsonb_array.value(i); + let key = key_array.value(i); + + match get_json_field_as_int(jsonb_bytes, key) { + Ok(Some(value)) => builder.append_value(value), + Ok(None) => builder.append_null(), + Err(e) => { + return Err(datafusion::error::DataFusionError::Execution(format!( + "Failed to get JSON int: {}", + e + ))); + } + } + } + } + + Ok(Arc::new(builder.finish())) +} + +/// Get a field value as integer with type coercion +fn get_json_field_as_int(jsonb_bytes: &[u8], key: &str) -> Result> { + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + + // Try as object field first + let value = match raw_jsonb.get_by_name(key, false) { + Ok(Some(value)) => value, + Ok(None) => { + // Try as array index + if let Ok(index) = key.parse::() { + match raw_jsonb.get_by_index(index) { + Ok(Some(value)) => value, + Ok(None) => return Ok(None), + Err(e) => { + return Err(datafusion::error::DataFusionError::Execution(format!( + "Failed to get array element: {}", + e + ))) + } + } + } else { + return Ok(None); + } + } + Err(e) => { + return Err(datafusion::error::DataFusionError::Execution(format!( + "Failed to get field: {}", + e + ))) + } + }; + + // Convert to string and parse + let json_str = value.to_string(); + + // Check for null + if json_str == "null" { + return Ok(None); + } + + // Boolean conversion + if json_str == "true" { + return Ok(Some(1)); + } else if json_str == "false" { + return Ok(Some(0)); + } + + // String value (remove quotes) + let s = if json_str.starts_with('"') && json_str.ends_with('"') { + &json_str[1..json_str.len() - 1] + } else { + &json_str + }; + + // Try to parse as integer + if let Ok(n) = s.parse::() { + Ok(Some(n)) + } else if let Ok(f) = s.parse::() { + // Truncate float to int + Ok(Some(f as i64)) + } else if s.starts_with('[') || s.starts_with('{') { + Err(datafusion::error::DataFusionError::Execution( + "Cannot convert JSON object or array to integer".to_string(), + )) + } else { + Err(datafusion::error::DataFusionError::Execution(format!( + "Cannot convert string '{}' to integer", + s + ))) + } +} + +/// Create the json_get_float UDF for getting a float value +pub fn json_get_float_udf() -> ScalarUDF { + create_udf( + "json_get_float", + vec![DataType::LargeBinary, DataType::Utf8], + DataType::Float64, + Volatility::Immutable, + Arc::new(json_get_float_columnar_impl), + ) +} + +/// Implementation of json_get_float function with ColumnarValue +fn json_get_float_columnar_impl(args: &[ColumnarValue]) -> Result { + let arrays: Vec = args + .iter() + .map(|arg| match arg { + ColumnarValue::Array(arr) => arr.clone(), + ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), + }) + .collect(); + + let result = json_get_float_impl(&arrays)?; + Ok(ColumnarValue::Array(result)) +} + +/// Implementation of json_get_float function +fn json_get_float_impl(args: &[ArrayRef]) -> Result { + if args.len() != 2 { + return Err(datafusion::error::DataFusionError::Execution( + "json_get_float requires exactly 2 arguments".to_string(), + )); + } + + let jsonb_array = args[0] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Execution( + "First argument must be LargeBinary".to_string(), + ) + })?; + + let key_array = args[1] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Execution( + "Second argument must be String".to_string(), + ) + })?; + + let mut builder = Float64Builder::new(); + + for i in 0..jsonb_array.len() { + if jsonb_array.is_null(i) || key_array.is_null(i) { + builder.append_null(); + } else { + let jsonb_bytes = jsonb_array.value(i); + let key = key_array.value(i); + + match get_json_field_as_float(jsonb_bytes, key) { + Ok(Some(value)) => builder.append_value(value), + Ok(None) => builder.append_null(), + Err(e) => { + return Err(datafusion::error::DataFusionError::Execution(format!( + "Failed to get JSON float: {}", + e + ))); + } + } + } + } + + Ok(Arc::new(builder.finish())) +} + +/// Get a field value as float with type coercion +fn get_json_field_as_float(jsonb_bytes: &[u8], key: &str) -> Result> { + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + + // Try as object field first + let value = match raw_jsonb.get_by_name(key, false) { + Ok(Some(value)) => value, + Ok(None) => { + // Try as array index + if let Ok(index) = key.parse::() { + match raw_jsonb.get_by_index(index) { + Ok(Some(value)) => value, + Ok(None) => return Ok(None), + Err(e) => { + return Err(datafusion::error::DataFusionError::Execution(format!( + "Failed to get array element: {}", + e + ))) + } + } + } else { + return Ok(None); + } + } + Err(e) => { + return Err(datafusion::error::DataFusionError::Execution(format!( + "Failed to get field: {}", + e + ))) + } + }; + + // Convert to string and parse + let json_str = value.to_string(); + + // Check for null + if json_str == "null" { + return Ok(None); + } + + // Boolean conversion + if json_str == "true" { + return Ok(Some(1.0)); + } else if json_str == "false" { + return Ok(Some(0.0)); + } + + // String value (remove quotes) + let s = if json_str.starts_with('"') && json_str.ends_with('"') { + &json_str[1..json_str.len() - 1] + } else { + &json_str + }; + + // Try to parse as float + s.parse::().map(Some).map_err(|_| { + if s.starts_with('[') || s.starts_with('{') { + datafusion::error::DataFusionError::Execution( + "Cannot convert JSON object or array to float".to_string(), + ) + } else { + datafusion::error::DataFusionError::Execution(format!( + "Cannot convert string '{}' to float", + s + )) + } + }) +} + +/// Create the json_get_bool UDF for getting a boolean value +pub fn json_get_bool_udf() -> ScalarUDF { + create_udf( + "json_get_bool", + vec![DataType::LargeBinary, DataType::Utf8], + DataType::Boolean, + Volatility::Immutable, + Arc::new(json_get_bool_columnar_impl), + ) +} + +/// Implementation of json_get_bool function with ColumnarValue +fn json_get_bool_columnar_impl(args: &[ColumnarValue]) -> Result { + let arrays: Vec = args + .iter() + .map(|arg| match arg { + ColumnarValue::Array(arr) => arr.clone(), + ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), + }) + .collect(); + + let result = json_get_bool_impl(&arrays)?; + Ok(ColumnarValue::Array(result)) +} + +/// Implementation of json_get_bool function +fn json_get_bool_impl(args: &[ArrayRef]) -> Result { + if args.len() != 2 { + return Err(datafusion::error::DataFusionError::Execution( + "json_get_bool requires exactly 2 arguments".to_string(), + )); + } + + let jsonb_array = args[0] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Execution( + "First argument must be LargeBinary".to_string(), + ) + })?; + + let key_array = args[1] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Execution( + "Second argument must be String".to_string(), + ) + })?; + + let mut builder = BooleanBuilder::new(); + + for i in 0..jsonb_array.len() { + if jsonb_array.is_null(i) || key_array.is_null(i) { + builder.append_null(); + } else { + let jsonb_bytes = jsonb_array.value(i); + let key = key_array.value(i); + + match get_json_field_as_bool(jsonb_bytes, key) { + Ok(Some(value)) => builder.append_value(value), + Ok(None) => builder.append_null(), + Err(e) => { + return Err(datafusion::error::DataFusionError::Execution(format!( + "Failed to get JSON bool: {}", + e + ))); + } + } + } + } + + Ok(Arc::new(builder.finish())) +} + +/// Get a field value as boolean with type coercion +fn get_json_field_as_bool(jsonb_bytes: &[u8], key: &str) -> Result> { + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + + // Try as object field first + let value = match raw_jsonb.get_by_name(key, false) { + Ok(Some(value)) => value, + Ok(None) => { + // Try as array index + if let Ok(index) = key.parse::() { + match raw_jsonb.get_by_index(index) { + Ok(Some(value)) => value, + Ok(None) => return Ok(None), + Err(e) => { + return Err(datafusion::error::DataFusionError::Execution(format!( + "Failed to get array element: {}", + e + ))) + } + } + } else { + return Ok(None); + } + } + Err(e) => { + return Err(datafusion::error::DataFusionError::Execution(format!( + "Failed to get field: {}", + e + ))) + } + }; + + // Convert to string and parse + let json_str = value.to_string(); + + // Check for null + if json_str == "null" { + return Ok(None); + } + + // Direct boolean + if json_str == "true" { + return Ok(Some(true)); + } else if json_str == "false" { + return Ok(Some(false)); + } + + // String value (remove quotes and check) + let s = if json_str.starts_with('"') && json_str.ends_with('"') { + json_str[1..json_str.len() - 1].to_lowercase() + } else { + json_str.to_lowercase() + }; + + // String to bool conversion + match s.as_str() { + "true" | "1" | "yes" | "y" | "on" => Ok(Some(true)), + "false" | "0" | "no" | "n" | "off" => Ok(Some(false)), + _ => { + // Try as number + if let Ok(n) = s.parse::() { + Ok(Some(n != 0)) + } else if let Ok(f) = s.parse::() { + Ok(Some(f != 0.0)) + } else if s.starts_with('[') || s.starts_with('{') { + Err(datafusion::error::DataFusionError::Execution( + "Cannot convert JSON object or array to boolean".to_string(), + )) + } else { + Err(datafusion::error::DataFusionError::Execution(format!( + "Cannot convert string '{}' to boolean", + s + ))) + } + } + } +} + +/// Create the json_array_contains UDF for checking if array contains a value +pub fn json_array_contains_udf() -> ScalarUDF { + create_udf( + "json_array_contains", + vec![DataType::LargeBinary, DataType::Utf8, DataType::Utf8], + DataType::Boolean, + Volatility::Immutable, + Arc::new(json_array_contains_columnar_impl), + ) +} + +/// Implementation of json_array_contains function with ColumnarValue +fn json_array_contains_columnar_impl(args: &[ColumnarValue]) -> Result { + let arrays: Vec = args + .iter() + .map(|arg| match arg { + ColumnarValue::Array(arr) => arr.clone(), + ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), + }) + .collect(); + + let result = json_array_contains_impl(&arrays)?; + Ok(ColumnarValue::Array(result)) +} + +/// Implementation of json_array_contains function +fn json_array_contains_impl(args: &[ArrayRef]) -> Result { + if args.len() != 3 { + return Err(datafusion::error::DataFusionError::Execution( + "json_array_contains requires exactly 3 arguments".to_string(), + )); + } + + let jsonb_array = args[0] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Execution( + "First argument must be LargeBinary".to_string(), + ) + })?; + + let path_array = args[1] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Execution( + "Second argument must be String".to_string(), + ) + })?; + + let value_array = args[2] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Execution( + "Third argument must be String".to_string(), + ) + })?; + + let mut builder = BooleanBuilder::new(); + + for i in 0..jsonb_array.len() { + if jsonb_array.is_null(i) || path_array.is_null(i) || value_array.is_null(i) { + builder.append_null(); + } else { + let jsonb_bytes = jsonb_array.value(i); + let path = path_array.value(i); + let value = value_array.value(i); + + match check_array_contains(jsonb_bytes, path, value) { + Ok(contains) => builder.append_value(contains), + Err(e) => { + return Err(datafusion::error::DataFusionError::Execution(format!( + "Failed to check array contains: {}", + e + ))); + } + } + } + } + + Ok(Arc::new(builder.finish())) +} + +/// Check if a JSON array at path contains a value +fn check_array_contains(jsonb_bytes: &[u8], path: &str, value: &str) -> Result { + let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes()).map_err(|e| { + datafusion::error::DataFusionError::Execution(format!("Invalid JSONPath: {}", e)) + })?; + + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb); + match selector.select_values(&json_path) { + Ok(values) => { + for v in values { + // Convert to raw JSONB for direct access + let raw = v.as_raw(); + // Check if it's an array by trying to iterate + let mut index = 0; + loop { + match raw.get_by_index(index) { + Ok(Some(elem)) => { + let elem_str = elem.to_string(); + // Compare as JSON strings (with quotes for strings) + if elem_str == value || elem_str == format!("\"{}\"", value) { + return Ok(true); + } + index += 1; + } + Ok(None) => break, // End of array + Err(_) => break, // Not an array or error + } + } + } + Ok(false) + } + Err(_) => Ok(false), + } +} + +/// Create the json_array_length UDF for getting array length +pub fn json_array_length_udf() -> ScalarUDF { + create_udf( + "json_array_length", + vec![DataType::LargeBinary, DataType::Utf8], + DataType::Int64, + Volatility::Immutable, + Arc::new(json_array_length_columnar_impl), + ) +} + +/// Implementation of json_array_length function with ColumnarValue +fn json_array_length_columnar_impl(args: &[ColumnarValue]) -> Result { + let arrays: Vec = args + .iter() + .map(|arg| match arg { + ColumnarValue::Array(arr) => arr.clone(), + ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), + }) + .collect(); + + let result = json_array_length_impl(&arrays)?; + Ok(ColumnarValue::Array(result)) +} + +/// Implementation of json_array_length function +fn json_array_length_impl(args: &[ArrayRef]) -> Result { + if args.len() != 2 { + return Err(datafusion::error::DataFusionError::Execution( + "json_array_length requires exactly 2 arguments".to_string(), + )); + } + + let jsonb_array = args[0] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Execution( + "First argument must be LargeBinary".to_string(), + ) + })?; + + let path_array = args[1] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Execution( + "Second argument must be String".to_string(), + ) + })?; + + let mut builder = Int64Builder::new(); + + for i in 0..jsonb_array.len() { + if jsonb_array.is_null(i) || path_array.is_null(i) { + builder.append_null(); + } else { + let jsonb_bytes = jsonb_array.value(i); + let path = path_array.value(i); + + match get_array_length(jsonb_bytes, path) { + Ok(Some(len)) => builder.append_value(len), + Ok(None) => builder.append_null(), + Err(e) => { + return Err(datafusion::error::DataFusionError::Execution(format!( + "Failed to get array length: {}", + e + ))); + } + } + } + } + + Ok(Arc::new(builder.finish())) +} + +/// Get the length of a JSON array at path +fn get_array_length(jsonb_bytes: &[u8], path: &str) -> Result> { + let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes()).map_err(|e| { + datafusion::error::DataFusionError::Execution(format!("Invalid JSONPath: {}", e)) + })?; + + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb); + match selector.select_values(&json_path) { + Ok(values) => { + if values.is_empty() { + return Ok(None); + } + let first = &values[0]; + let raw = first.as_raw(); + + // Count array elements by iterating + let mut count = 0; + loop { + match raw.get_by_index(count) { + Ok(Some(_)) => count += 1, + Ok(None) => break, // End of array + Err(_) => { + // Not an array + if count == 0 { + return Err(datafusion::error::DataFusionError::Execution(format!( + "Path does not point to an array: {}", + path + ))); + } + break; + } + } + } + Ok(Some(count as i64)) + } + Err(_) => Ok(None), + } +} + #[cfg(test)] mod tests { use super::*; use arrow_array::builder::LargeBinaryBuilder; + use arrow_array::{BooleanArray, Int64Array}; + + fn create_test_jsonb(json_str: &str) -> Vec { + jsonb::parse_value(json_str.as_bytes()).unwrap().to_vec() + } #[tokio::test] - async fn test_variant_extract_udf() -> Result<()> { - // Create test JSONB data + async fn test_json_extract_udf() -> Result<()> { let json = r#"{"user": {"name": "Alice", "age": 30}}"#; - let jsonb_bytes = jsonb::parse_value(json.as_bytes()).unwrap().to_vec(); + let jsonb_bytes = create_test_jsonb(json); - // Create arrays let mut binary_builder = LargeBinaryBuilder::new(); binary_builder.append_value(&jsonb_bytes); binary_builder.append_value(&jsonb_bytes); @@ -130,7 +1150,6 @@ mod tests { Some("$.user.name"), ])); - // Call UDF let result = json_extract_impl(&[jsonb_array, path_array])?; let string_array = result.as_any().downcast_ref::().unwrap(); @@ -141,4 +1160,307 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_json_exists_udf() -> Result<()> { + let json = r#"{"user": {"name": "Alice", "age": 30}, "tags": ["rust", "json"]}"#; + let jsonb_bytes = create_test_jsonb(json); + + let mut binary_builder = LargeBinaryBuilder::new(); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_null(); + + let jsonb_array = Arc::new(binary_builder.finish()); + let path_array = Arc::new(StringArray::from(vec![ + Some("$.user.name"), + Some("$.user.email"), + Some("$.tags"), + Some("$.any"), + ])); + + let result = json_exists_impl(&[jsonb_array, path_array])?; + let bool_array = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(bool_array.len(), 4); + assert!(bool_array.value(0)); + assert!(!bool_array.value(1)); + assert!(bool_array.value(2)); + assert!(bool_array.is_null(3)); + + Ok(()) + } + + #[tokio::test] + async fn test_json_get_udf() -> Result<()> { + let json = r#"{"name": "Alice", "nested": {"value": 42}, "arr": [1, 2, 3]}"#; + let jsonb_bytes = create_test_jsonb(json); + + let mut binary_builder = LargeBinaryBuilder::new(); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_null(); + + let jsonb_array = Arc::new(binary_builder.finish()); + let key_array = Arc::new(StringArray::from(vec![ + Some("name"), + Some("nested"), + Some("not_exists"), + Some("any"), + ])); + + let result = json_get_impl(&[jsonb_array, key_array])?; + let binary_array = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(binary_array.len(), 4); + assert!(!binary_array.is_null(0)); + assert!(!binary_array.is_null(1)); + assert!(binary_array.is_null(2)); + assert!(binary_array.is_null(3)); + + // Verify returned values are valid JSONB + let value0 = jsonb::RawJsonb::new(binary_array.value(0)); + assert_eq!(value0.to_string(), "\"Alice\""); + + let value1 = jsonb::RawJsonb::new(binary_array.value(1)); + assert!(value1.to_string().contains("\"value\":42")); + + Ok(()) + } + + #[tokio::test] + async fn test_json_get_string_udf() -> Result<()> { + // Test valid string conversions + let json = r#"{"str": "hello", "num": 123, "bool": true, "null": null}"#; + let jsonb_bytes = create_test_jsonb(json); + + let mut binary_builder = LargeBinaryBuilder::new(); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + + let jsonb_array = Arc::new(binary_builder.finish()); + let key_array = Arc::new(StringArray::from(vec![ + Some("str"), + Some("num"), + Some("bool"), + Some("null"), + ])); + + let result = json_get_string_impl(&[jsonb_array, key_array])?; + let string_array = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(string_array.len(), 4); + assert_eq!(string_array.value(0), "hello"); + assert_eq!(string_array.value(1), "123"); + assert_eq!(string_array.value(2), "true"); + assert!(string_array.is_null(3)); + + Ok(()) + } + + #[tokio::test] + async fn test_json_get_string_error_on_object() -> Result<()> { + // Test that objects cannot be converted to string + let json = r#"{"obj": {}}"#; + let jsonb_bytes = create_test_jsonb(json); + + let mut binary_builder = LargeBinaryBuilder::new(); + binary_builder.append_value(&jsonb_bytes); + + let jsonb_array = Arc::new(binary_builder.finish()); + let key_array = Arc::new(StringArray::from(vec![Some("obj")])); + + let result = json_get_string_impl(&[jsonb_array, key_array]); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Cannot convert")); + + Ok(()) + } + + #[tokio::test] + async fn test_json_get_int_udf() -> Result<()> { + let json = r#"{"int": 42, "float": 3.14, "str_num": "99", "bool": true, "str": "abc"}"#; + let jsonb_bytes = create_test_jsonb(json); + + let mut binary_builder = LargeBinaryBuilder::new(); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + + let jsonb_array = Arc::new(binary_builder.finish()); + let key_array = Arc::new(StringArray::from(vec![ + Some("int"), + Some("float"), + Some("str_num"), + Some("bool"), + ])); + + let result = json_get_int_impl(&[jsonb_array, key_array])?; + let int_array = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(int_array.len(), 4); + assert_eq!(int_array.value(0), 42); + assert_eq!(int_array.value(1), 3); // Truncated + assert_eq!(int_array.value(2), 99); + assert_eq!(int_array.value(3), 1); + + Ok(()) + } + + #[tokio::test] + async fn test_json_get_int_error() -> Result<()> { + let json = r#"{"str": "not_a_number"}"#; + let jsonb_bytes = create_test_jsonb(json); + + let mut binary_builder = LargeBinaryBuilder::new(); + binary_builder.append_value(&jsonb_bytes); + + let jsonb_array = Arc::new(binary_builder.finish()); + let key_array = Arc::new(StringArray::from(vec![Some("str")])); + + let result = json_get_int_impl(&[jsonb_array, key_array]); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Cannot convert string")); + + Ok(()) + } + + #[tokio::test] + async fn test_json_get_bool_udf() -> Result<()> { + let json = + r#"{"bool_true": true, "bool_false": false, "num": 1, "zero": 0, "str": "true"}"#; + let jsonb_bytes = create_test_jsonb(json); + + let mut binary_builder = LargeBinaryBuilder::new(); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + + let jsonb_array = Arc::new(binary_builder.finish()); + let key_array = Arc::new(StringArray::from(vec![ + Some("bool_true"), + Some("bool_false"), + Some("num"), + Some("zero"), + Some("str"), + ])); + + let result = json_get_bool_impl(&[jsonb_array, key_array])?; + let bool_array = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(bool_array.len(), 5); + assert!(bool_array.value(0)); + assert!(!bool_array.value(1)); + assert!(bool_array.value(2)); + assert!(!bool_array.value(3)); + assert!(bool_array.value(4)); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_contains_udf() -> Result<()> { + let json = r#"{"tags": ["rust", "json", "database"], "nums": [1, 2, 3]}"#; + let jsonb_bytes = create_test_jsonb(json); + + let mut binary_builder = LargeBinaryBuilder::new(); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_null(); + + let jsonb_array = Arc::new(binary_builder.finish()); + let path_array = Arc::new(StringArray::from(vec![ + Some("$.tags"), + Some("$.tags"), + Some("$.nums"), + Some("$.tags"), + ])); + let value_array = Arc::new(StringArray::from(vec![ + Some("rust"), + Some("python"), + Some("2"), + Some("any"), + ])); + + let result = json_array_contains_impl(&[jsonb_array, path_array, value_array])?; + let bool_array = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(bool_array.len(), 4); + assert!(bool_array.value(0)); + assert!(!bool_array.value(1)); + assert!(bool_array.value(2)); + assert!(bool_array.is_null(3)); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_length_udf() -> Result<()> { + let json = r#"{"empty": [], "tags": ["a", "b", "c"], "nested": {"arr": [1, 2]}}"#; + let jsonb_bytes = create_test_jsonb(json); + + let mut binary_builder = LargeBinaryBuilder::new(); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_null(); + + let jsonb_array = Arc::new(binary_builder.finish()); + let path_array = Arc::new(StringArray::from(vec![ + Some("$.empty"), + Some("$.tags"), + Some("$.nested.arr"), + Some("$.any"), + ])); + + let result = json_array_length_impl(&[jsonb_array, path_array])?; + let int_array = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(int_array.len(), 4); + assert_eq!(int_array.value(0), 0); + assert_eq!(int_array.value(1), 3); + assert_eq!(int_array.value(2), 2); + assert!(int_array.is_null(3)); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_access() -> Result<()> { + let json = r#"["first", "second", "third"]"#; + let jsonb_bytes = create_test_jsonb(json); + + let mut binary_builder = LargeBinaryBuilder::new(); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + binary_builder.append_value(&jsonb_bytes); + + let jsonb_array = Arc::new(binary_builder.finish()); + let key_array = Arc::new(StringArray::from(vec![ + Some("0"), + Some("1"), + Some("10"), // Out of bounds + ])); + + let result = json_get_string_impl(&[jsonb_array, key_array])?; + let string_array = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(string_array.len(), 3); + assert_eq!(string_array.value(0), "first"); + assert_eq!(string_array.value(1), "second"); + assert!(string_array.is_null(2)); + + Ok(()) + } } From 201520479df728ecf1e9c22862a1dc76d80e323b Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 28 Aug 2025 17:08:46 +0800 Subject: [PATCH 02/11] Cleanup Signed-off-by: Xuanwo --- rust/lance-datafusion/src/udf/json.rs | 1184 ++++++++----------------- 1 file changed, 378 insertions(+), 806 deletions(-) diff --git a/rust/lance-datafusion/src/udf/json.rs b/rust/lance-datafusion/src/udf/json.rs index bdea6271789..eaf50a7ba99 100644 --- a/rust/lance-datafusion/src/udf/json.rs +++ b/rust/lance-datafusion/src/udf/json.rs @@ -1,16 +1,195 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use arrow_array::builder::{BooleanBuilder, Float64Builder, Int64Builder, StringBuilder}; +use arrow_array::builder::{ + BooleanBuilder, Float64Builder, Int64Builder, LargeBinaryBuilder, StringBuilder, +}; use arrow_array::{Array, ArrayRef, LargeBinaryArray, StringArray}; use arrow_schema::DataType; -use datafusion::error::Result; +use datafusion::error::{DataFusionError, Result}; use datafusion::logical_expr::{ScalarUDF, Volatility}; use datafusion::physical_plan::ColumnarValue; use datafusion::prelude::create_udf; use std::sync::Arc; +/// Common helper functions and types for JSON UDFs +mod common { + use super::*; + + /// Key type for JSON field access - optimizes field/index parsing + #[derive(Debug, Clone)] + pub enum KeyType { + Field(String), + Index(usize), + } + + impl KeyType { + /// Parse a key string into either a field name or array index (once per operation) + pub fn parse(key: &str) -> Self { + if let Ok(index) = key.parse::() { + Self::Index(index) + } else { + Self::Field(key.to_string()) + } + } + } + + /// Convert ColumnarValue arguments to ArrayRef vector + /// + /// Note: This implementation currently broadcasts scalars to arrays. + /// Future optimization: handle scalars directly without broadcasting + /// to improve performance for scalar inputs. + pub fn columnar_to_arrays(args: &[ColumnarValue]) -> Vec { + args.iter() + .map(|arg| match arg { + ColumnarValue::Array(arr) => arr.clone(), + ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), + }) + .collect() + } + + /// Create DataFusionError for execution failures (simplified error wrapping) + pub fn execution_error(msg: impl Into) -> DataFusionError { + DataFusionError::Execution(msg.into()) + } + + /// Validate argument count for UDF + pub fn validate_arg_count( + args: &[ArrayRef], + expected: usize, + function_name: &str, + ) -> Result<()> { + if args.len() != expected { + return Err(execution_error(format!( + "{} requires exactly {} arguments", + function_name, expected + ))); + } + Ok(()) + } + + /// Extract and validate LargeBinaryArray from first argument + pub fn extract_jsonb_array(args: &[ArrayRef]) -> Result<&LargeBinaryArray> { + args[0] + .as_any() + .downcast_ref::() + .ok_or_else(|| execution_error("First argument must be LargeBinary")) + } + + /// Extract and validate StringArray from specified argument + pub fn extract_string_array(args: &[ArrayRef], arg_index: usize) -> Result<&StringArray> { + args[arg_index] + .as_any() + .downcast_ref::() + .ok_or_else(|| execution_error(format!("Argument {} must be String", arg_index + 1))) + } + + /// Get JSON field/element using pre-parsed key type (avoids repeated parsing) + pub fn get_json_value_by_key( + raw_jsonb: &jsonb::RawJsonb, + key_type: &KeyType, + ) -> Result> { + match key_type { + KeyType::Field(field) => raw_jsonb + .get_by_name(field, false) + .map_err(|e| execution_error(format!("Failed to get field '{}': {}", field, e))), + KeyType::Index(index) => raw_jsonb.get_by_index(*index).map_err(|e| { + execution_error(format!("Failed to get array element [{}]: {}", index, e)) + }), + } + } + + /// Parse JSONPath with proper error handling (no false returns) + pub fn parse_json_path(path: &str) -> Result> { + jsonb::jsonpath::parse_json_path(path.as_bytes()) + .map_err(|e| execution_error(format!("Invalid JSONPath '{}': {}", path, e))) + } +} + +/// Convert JSONB value to string using jsonb's built-in serde (strict mode) +fn json_value_to_string(value: jsonb::OwnedJsonb) -> Result> { + let raw_jsonb = value.as_raw(); + + // Check for null first + if raw_jsonb + .is_null() + .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))? + { + return Ok(None); + } + + // Use jsonb's built-in to_str() method - strict conversion + raw_jsonb + .to_str() + .map(Some) + .map_err(|e| common::execution_error(format!("Failed to convert to string: {}", e))) +} + +/// Convert JSONB value to integer using jsonb's built-in serde (strict mode) +fn json_value_to_int(value: jsonb::OwnedJsonb) -> Result> { + let raw_jsonb = value.as_raw(); + + // Check for null first + if raw_jsonb + .is_null() + .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))? + { + return Ok(None); + } + + // Use jsonb's built-in to_i64() method - strict conversion + raw_jsonb + .to_i64() + .map(Some) + .map_err(|e| common::execution_error(format!("Failed to convert to integer: {}", e))) +} + +/// Convert JSONB value to float using jsonb's built-in serde (strict mode) +fn json_value_to_float(value: jsonb::OwnedJsonb) -> Result> { + let raw_jsonb = value.as_raw(); + + // Check for null first + if raw_jsonb + .is_null() + .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))? + { + return Ok(None); + } + + // Use jsonb's built-in to_f64() method - strict conversion + raw_jsonb + .to_f64() + .map(Some) + .map_err(|e| common::execution_error(format!("Failed to convert to float: {}", e))) +} + +/// Convert JSONB value to boolean using jsonb's built-in serde (strict mode) +fn json_value_to_bool(value: jsonb::OwnedJsonb) -> Result> { + let raw_jsonb = value.as_raw(); + + // Check for null first + if raw_jsonb + .is_null() + .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))? + { + return Ok(None); + } + + // Use jsonb's built-in to_bool() method - strict conversion + raw_jsonb + .to_bool() + .map(Some) + .map_err(|e| common::execution_error(format!("Failed to convert to boolean: {}", e))) +} + /// Create the json_extract UDF for extracting JSONPath from JSONB data +/// +/// # Arguments +/// * First parameter: JSONB binary data (LargeBinary) +/// * Second parameter: JSONPath expression as string (Utf8) +/// +/// # Returns +/// String representation of the extracted value, or null if path not found pub fn json_extract_udf() -> ScalarUDF { create_udf( "json_extract", @@ -23,45 +202,19 @@ pub fn json_extract_udf() -> ScalarUDF { /// Implementation of json_extract function with ColumnarValue fn json_extract_columnar_impl(args: &[ColumnarValue]) -> Result { - let arrays: Vec = args - .iter() - .map(|arg| match arg { - ColumnarValue::Array(arr) => arr.clone(), - ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), - }) - .collect(); - + let arrays = common::columnar_to_arrays(args); let result = json_extract_impl(&arrays)?; Ok(ColumnarValue::Array(result)) } /// Implementation of json_extract function fn json_extract_impl(args: &[ArrayRef]) -> Result { - if args.len() != 2 { - return Err(datafusion::error::DataFusionError::Execution( - "json_extract requires exactly 2 arguments".to_string(), - )); - } + common::validate_arg_count(args, 2, "json_extract")?; + + let jsonb_array = common::extract_jsonb_array(args)?; + let path_array = common::extract_string_array(args, 1)?; - let jsonb_array = args[0] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "First argument must be LargeBinary".to_string(), - ) - })?; - - let path_array = args[1] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "Second argument must be String".to_string(), - ) - })?; - - let mut builder = StringBuilder::new(); + let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024); for i in 0..jsonb_array.len() { if jsonb_array.is_null(i) || path_array.is_null(i) { @@ -70,15 +223,9 @@ fn json_extract_impl(args: &[ArrayRef]) -> Result { let jsonb_bytes = jsonb_array.value(i); let path = path_array.value(i); - match extract_json_path(jsonb_bytes, path) { - Ok(Some(value)) => builder.append_value(&value), - Ok(None) => builder.append_null(), - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to extract JSONPath: {}", - e - ))); - } + match extract_json_path(jsonb_bytes, path)? { + Some(value) => builder.append_value(&value), + None => builder.append_null(), } } } @@ -87,10 +234,10 @@ fn json_extract_impl(args: &[ArrayRef]) -> Result { } /// Extract value from JSONB using JSONPath +/// +/// Note: Uses `select_values` instead of the deprecated `select_by_path` method fn extract_json_path(jsonb_bytes: &[u8], path: &str) -> Result> { - let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes()).map_err(|e| { - datafusion::error::DataFusionError::Execution(format!("Invalid JSONPath: {}", e)) - })?; + let json_path = common::parse_json_path(path)?; let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb); @@ -103,11 +250,21 @@ fn extract_json_path(jsonb_bytes: &[u8], path: &str) -> Result> { Ok(Some(values[0].to_string())) } } - Err(_) => Ok(None), // Path not found or error + Err(e) => Err(common::execution_error(format!( + "Failed to select values from path '{}': {}", + path, e + ))), } } /// Create the json_exists UDF for checking if a JSONPath exists +/// +/// # Arguments +/// * First parameter: JSONB binary data (LargeBinary) +/// * Second parameter: JSONPath expression as string (Utf8) +/// +/// # Returns +/// Boolean indicating whether the path exists in the JSON data pub fn json_exists_udf() -> ScalarUDF { create_udf( "json_exists", @@ -120,45 +277,19 @@ pub fn json_exists_udf() -> ScalarUDF { /// Implementation of json_exists function with ColumnarValue fn json_exists_columnar_impl(args: &[ColumnarValue]) -> Result { - let arrays: Vec = args - .iter() - .map(|arg| match arg { - ColumnarValue::Array(arr) => arr.clone(), - ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), - }) - .collect(); - + let arrays = common::columnar_to_arrays(args); let result = json_exists_impl(&arrays)?; Ok(ColumnarValue::Array(result)) } /// Implementation of json_exists function fn json_exists_impl(args: &[ArrayRef]) -> Result { - if args.len() != 2 { - return Err(datafusion::error::DataFusionError::Execution( - "json_exists requires exactly 2 arguments".to_string(), - )); - } + common::validate_arg_count(args, 2, "json_exists")?; + + let jsonb_array = common::extract_jsonb_array(args)?; + let path_array = common::extract_string_array(args, 1)?; - let jsonb_array = args[0] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "First argument must be LargeBinary".to_string(), - ) - })?; - - let path_array = args[1] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "Second argument must be String".to_string(), - ) - })?; - - let mut builder = BooleanBuilder::new(); + let mut builder = BooleanBuilder::with_capacity(jsonb_array.len()); for i in 0..jsonb_array.len() { if jsonb_array.is_null(i) || path_array.is_null(i) { @@ -167,15 +298,8 @@ fn json_exists_impl(args: &[ArrayRef]) -> Result { let jsonb_bytes = jsonb_array.value(i); let path = path_array.value(i); - match check_json_path_exists(jsonb_bytes, path) { - Ok(exists) => builder.append_value(exists), - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to check JSONPath existence: {}", - e - ))); - } - } + let exists = check_json_path_exists(jsonb_bytes, path)?; + builder.append_value(exists); } } @@ -184,19 +308,27 @@ fn json_exists_impl(args: &[ArrayRef]) -> Result { /// Check if a JSONPath exists in JSONB fn check_json_path_exists(jsonb_bytes: &[u8], path: &str) -> Result { - let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes()).map_err(|e| { - datafusion::error::DataFusionError::Execution(format!("Invalid JSONPath: {}", e)) - })?; + let json_path = common::parse_json_path(path)?; let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb); match selector.exists(&json_path) { Ok(exists) => Ok(exists), - Err(_) => Ok(false), + Err(e) => Err(common::execution_error(format!( + "Failed to check existence of path '{}': {}", + path, e + ))), } } /// Create the json_get UDF for getting a field value as JSON string +/// +/// # Arguments +/// * First parameter: JSONB binary data (LargeBinary) +/// * Second parameter: Field name or array index as string (Utf8) +/// +/// # Returns +/// Raw JSONB bytes of the field value, or null if not found pub fn json_get_udf() -> ScalarUDF { create_udf( "json_get", @@ -209,45 +341,19 @@ pub fn json_get_udf() -> ScalarUDF { /// Implementation of json_get function with ColumnarValue fn json_get_columnar_impl(args: &[ColumnarValue]) -> Result { - let arrays: Vec = args - .iter() - .map(|arg| match arg { - ColumnarValue::Array(arr) => arr.clone(), - ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), - }) - .collect(); - + let arrays = common::columnar_to_arrays(args); let result = json_get_impl(&arrays)?; Ok(ColumnarValue::Array(result)) } /// Implementation of json_get function fn json_get_impl(args: &[ArrayRef]) -> Result { - if args.len() != 2 { - return Err(datafusion::error::DataFusionError::Execution( - "json_get requires exactly 2 arguments".to_string(), - )); - } + common::validate_arg_count(args, 2, "json_get")?; + + let jsonb_array = common::extract_jsonb_array(args)?; + let key_array = common::extract_string_array(args, 1)?; - let jsonb_array = args[0] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "First argument must be LargeBinary".to_string(), - ) - })?; - - let key_array = args[1] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "Second argument must be String".to_string(), - ) - })?; - - let mut builder = arrow_array::builder::LargeBinaryBuilder::new(); + let mut builder = LargeBinaryBuilder::with_capacity(jsonb_array.len(), 0); for i in 0..jsonb_array.len() { if jsonb_array.is_null(i) || key_array.is_null(i) { @@ -255,16 +361,12 @@ fn json_get_impl(args: &[ArrayRef]) -> Result { } else { let jsonb_bytes = jsonb_array.value(i); let key = key_array.value(i); + let key_type = common::KeyType::parse(key); + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); - match get_json_field(jsonb_bytes, key) { - Ok(Some(value)) => builder.append_value(value), - Ok(None) => builder.append_null(), - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to get JSON field: {}", - e - ))); - } + match common::get_json_value_by_key(&raw_jsonb, &key_type)? { + Some(value) => builder.append_value(value.as_raw().as_ref()), + None => builder.append_null(), } } } @@ -272,40 +374,14 @@ fn json_get_impl(args: &[ArrayRef]) -> Result { Ok(Arc::new(builder.finish())) } -/// Get a field value from JSONB (returns JSONB bytes) -fn get_json_field(jsonb_bytes: &[u8], key: &str) -> Result>> { - let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); - - // Try as object field first - match raw_jsonb.get_by_name(key, false) { - Ok(Some(value)) => return Ok(Some(value.as_raw().as_ref().to_vec())), - Ok(None) => {} - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to get field: {}", - e - ))) - } - } - - // Try as array index - if let Ok(index) = key.parse::() { - match raw_jsonb.get_by_index(index) { - Ok(Some(value)) => return Ok(Some(value.as_raw().as_ref().to_vec())), - Ok(None) => {} - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to get array element: {}", - e - ))) - } - } - } - - Ok(None) -} - /// Create the json_get_string UDF for getting a string value +/// +/// # Arguments +/// * First parameter: JSONB binary data (LargeBinary) +/// * Second parameter: Field name or array index as string (Utf8) +/// +/// # Returns +/// String value with type coercion (numbers/booleans converted to strings) pub fn json_get_string_udf() -> ScalarUDF { create_udf( "json_get_string", @@ -318,45 +394,19 @@ pub fn json_get_string_udf() -> ScalarUDF { /// Implementation of json_get_string function with ColumnarValue fn json_get_string_columnar_impl(args: &[ColumnarValue]) -> Result { - let arrays: Vec = args - .iter() - .map(|arg| match arg { - ColumnarValue::Array(arr) => arr.clone(), - ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), - }) - .collect(); - + let arrays = common::columnar_to_arrays(args); let result = json_get_string_impl(&arrays)?; Ok(ColumnarValue::Array(result)) } /// Implementation of json_get_string function fn json_get_string_impl(args: &[ArrayRef]) -> Result { - if args.len() != 2 { - return Err(datafusion::error::DataFusionError::Execution( - "json_get_string requires exactly 2 arguments".to_string(), - )); - } + common::validate_arg_count(args, 2, "json_get_string")?; + + let jsonb_array = common::extract_jsonb_array(args)?; + let key_array = common::extract_string_array(args, 1)?; - let jsonb_array = args[0] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "First argument must be LargeBinary".to_string(), - ) - })?; - - let key_array = args[1] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "Second argument must be String".to_string(), - ) - })?; - - let mut builder = StringBuilder::new(); + let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024); for i in 0..jsonb_array.len() { if jsonb_array.is_null(i) || key_array.is_null(i) { @@ -364,16 +414,15 @@ fn json_get_string_impl(args: &[ArrayRef]) -> Result { } else { let jsonb_bytes = jsonb_array.value(i); let key = key_array.value(i); - - match get_json_field_as_string(jsonb_bytes, key) { - Ok(Some(value)) => builder.append_value(&value), - Ok(None) => builder.append_null(), - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to get JSON string: {}", - e - ))); - } + let key_type = common::KeyType::parse(key); + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + + match common::get_json_value_by_key(&raw_jsonb, &key_type)? { + Some(value) => match json_value_to_string(value)? { + Some(string_val) => builder.append_value(&string_val), + None => builder.append_null(), + }, + None => builder.append_null(), } } } @@ -381,65 +430,14 @@ fn json_get_string_impl(args: &[ArrayRef]) -> Result { Ok(Arc::new(builder.finish())) } -/// Get a field value as string with type coercion -fn get_json_field_as_string(jsonb_bytes: &[u8], key: &str) -> Result> { - let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); - - // Try as object field first - let value = match raw_jsonb.get_by_name(key, false) { - Ok(Some(value)) => value, - Ok(None) => { - // Try as array index - if let Ok(index) = key.parse::() { - match raw_jsonb.get_by_index(index) { - Ok(Some(value)) => value, - Ok(None) => return Ok(None), - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to get array element: {}", - e - ))) - } - } - } else { - return Ok(None); - } - } - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to get field: {}", - e - ))) - } - }; - - // Convert to string and inspect - let json_str = value.to_string(); - - // Check for null - if json_str == "null" { - return Ok(None); - } - - // Check if it's a string (starts and ends with quotes) - if json_str.starts_with('"') && json_str.ends_with('"') { - // Remove quotes - Ok(Some(json_str[1..json_str.len() - 1].to_string())) - } else if json_str == "true" || json_str == "false" { - // Boolean - Ok(Some(json_str)) - } else if json_str.starts_with('[') || json_str.starts_with('{') { - // Array or object - cannot convert to string - Err(datafusion::error::DataFusionError::Execution( - "Cannot convert JSON object or array to string".to_string(), - )) - } else { - // Number or other value - Ok(Some(json_str)) - } -} - /// Create the json_get_int UDF for getting an integer value +/// +/// # Arguments +/// * First parameter: JSONB binary data (LargeBinary) +/// * Second parameter: Field name or array index as string (Utf8) +/// +/// # Returns +/// Integer value with type coercion (strings/floats/booleans converted to int) pub fn json_get_int_udf() -> ScalarUDF { create_udf( "json_get_int", @@ -452,45 +450,19 @@ pub fn json_get_int_udf() -> ScalarUDF { /// Implementation of json_get_int function with ColumnarValue fn json_get_int_columnar_impl(args: &[ColumnarValue]) -> Result { - let arrays: Vec = args - .iter() - .map(|arg| match arg { - ColumnarValue::Array(arr) => arr.clone(), - ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), - }) - .collect(); - + let arrays = common::columnar_to_arrays(args); let result = json_get_int_impl(&arrays)?; Ok(ColumnarValue::Array(result)) } /// Implementation of json_get_int function fn json_get_int_impl(args: &[ArrayRef]) -> Result { - if args.len() != 2 { - return Err(datafusion::error::DataFusionError::Execution( - "json_get_int requires exactly 2 arguments".to_string(), - )); - } + common::validate_arg_count(args, 2, "json_get_int")?; + + let jsonb_array = common::extract_jsonb_array(args)?; + let key_array = common::extract_string_array(args, 1)?; - let jsonb_array = args[0] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "First argument must be LargeBinary".to_string(), - ) - })?; - - let key_array = args[1] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "Second argument must be String".to_string(), - ) - })?; - - let mut builder = Int64Builder::new(); + let mut builder = Int64Builder::with_capacity(jsonb_array.len()); for i in 0..jsonb_array.len() { if jsonb_array.is_null(i) || key_array.is_null(i) { @@ -498,16 +470,15 @@ fn json_get_int_impl(args: &[ArrayRef]) -> Result { } else { let jsonb_bytes = jsonb_array.value(i); let key = key_array.value(i); - - match get_json_field_as_int(jsonb_bytes, key) { - Ok(Some(value)) => builder.append_value(value), - Ok(None) => builder.append_null(), - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to get JSON int: {}", - e - ))); - } + let key_type = common::KeyType::parse(key); + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + + match common::get_json_value_by_key(&raw_jsonb, &key_type)? { + Some(value) => match json_value_to_int(value)? { + Some(int_val) => builder.append_value(int_val), + None => builder.append_null(), + }, + None => builder.append_null(), } } } @@ -515,79 +486,14 @@ fn json_get_int_impl(args: &[ArrayRef]) -> Result { Ok(Arc::new(builder.finish())) } -/// Get a field value as integer with type coercion -fn get_json_field_as_int(jsonb_bytes: &[u8], key: &str) -> Result> { - let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); - - // Try as object field first - let value = match raw_jsonb.get_by_name(key, false) { - Ok(Some(value)) => value, - Ok(None) => { - // Try as array index - if let Ok(index) = key.parse::() { - match raw_jsonb.get_by_index(index) { - Ok(Some(value)) => value, - Ok(None) => return Ok(None), - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to get array element: {}", - e - ))) - } - } - } else { - return Ok(None); - } - } - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to get field: {}", - e - ))) - } - }; - - // Convert to string and parse - let json_str = value.to_string(); - - // Check for null - if json_str == "null" { - return Ok(None); - } - - // Boolean conversion - if json_str == "true" { - return Ok(Some(1)); - } else if json_str == "false" { - return Ok(Some(0)); - } - - // String value (remove quotes) - let s = if json_str.starts_with('"') && json_str.ends_with('"') { - &json_str[1..json_str.len() - 1] - } else { - &json_str - }; - - // Try to parse as integer - if let Ok(n) = s.parse::() { - Ok(Some(n)) - } else if let Ok(f) = s.parse::() { - // Truncate float to int - Ok(Some(f as i64)) - } else if s.starts_with('[') || s.starts_with('{') { - Err(datafusion::error::DataFusionError::Execution( - "Cannot convert JSON object or array to integer".to_string(), - )) - } else { - Err(datafusion::error::DataFusionError::Execution(format!( - "Cannot convert string '{}' to integer", - s - ))) - } -} - /// Create the json_get_float UDF for getting a float value +/// +/// # Arguments +/// * First parameter: JSONB binary data (LargeBinary) +/// * Second parameter: Field name or array index as string (Utf8) +/// +/// # Returns +/// Float value with type coercion (strings/integers/booleans converted to float) pub fn json_get_float_udf() -> ScalarUDF { create_udf( "json_get_float", @@ -600,45 +506,19 @@ pub fn json_get_float_udf() -> ScalarUDF { /// Implementation of json_get_float function with ColumnarValue fn json_get_float_columnar_impl(args: &[ColumnarValue]) -> Result { - let arrays: Vec = args - .iter() - .map(|arg| match arg { - ColumnarValue::Array(arr) => arr.clone(), - ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), - }) - .collect(); - + let arrays = common::columnar_to_arrays(args); let result = json_get_float_impl(&arrays)?; Ok(ColumnarValue::Array(result)) } /// Implementation of json_get_float function fn json_get_float_impl(args: &[ArrayRef]) -> Result { - if args.len() != 2 { - return Err(datafusion::error::DataFusionError::Execution( - "json_get_float requires exactly 2 arguments".to_string(), - )); - } + common::validate_arg_count(args, 2, "json_get_float")?; - let jsonb_array = args[0] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "First argument must be LargeBinary".to_string(), - ) - })?; - - let key_array = args[1] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "Second argument must be String".to_string(), - ) - })?; - - let mut builder = Float64Builder::new(); + let jsonb_array = common::extract_jsonb_array(args)?; + let key_array = common::extract_string_array(args, 1)?; + + let mut builder = Float64Builder::with_capacity(jsonb_array.len()); for i in 0..jsonb_array.len() { if jsonb_array.is_null(i) || key_array.is_null(i) { @@ -646,16 +526,15 @@ fn json_get_float_impl(args: &[ArrayRef]) -> Result { } else { let jsonb_bytes = jsonb_array.value(i); let key = key_array.value(i); - - match get_json_field_as_float(jsonb_bytes, key) { - Ok(Some(value)) => builder.append_value(value), - Ok(None) => builder.append_null(), - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to get JSON float: {}", - e - ))); - } + let key_type = common::KeyType::parse(key); + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + + match common::get_json_value_by_key(&raw_jsonb, &key_type)? { + Some(value) => match json_value_to_float(value)? { + Some(float_val) => builder.append_value(float_val), + None => builder.append_null(), + }, + None => builder.append_null(), } } } @@ -663,76 +542,14 @@ fn json_get_float_impl(args: &[ArrayRef]) -> Result { Ok(Arc::new(builder.finish())) } -/// Get a field value as float with type coercion -fn get_json_field_as_float(jsonb_bytes: &[u8], key: &str) -> Result> { - let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); - - // Try as object field first - let value = match raw_jsonb.get_by_name(key, false) { - Ok(Some(value)) => value, - Ok(None) => { - // Try as array index - if let Ok(index) = key.parse::() { - match raw_jsonb.get_by_index(index) { - Ok(Some(value)) => value, - Ok(None) => return Ok(None), - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to get array element: {}", - e - ))) - } - } - } else { - return Ok(None); - } - } - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to get field: {}", - e - ))) - } - }; - - // Convert to string and parse - let json_str = value.to_string(); - - // Check for null - if json_str == "null" { - return Ok(None); - } - - // Boolean conversion - if json_str == "true" { - return Ok(Some(1.0)); - } else if json_str == "false" { - return Ok(Some(0.0)); - } - - // String value (remove quotes) - let s = if json_str.starts_with('"') && json_str.ends_with('"') { - &json_str[1..json_str.len() - 1] - } else { - &json_str - }; - - // Try to parse as float - s.parse::().map(Some).map_err(|_| { - if s.starts_with('[') || s.starts_with('{') { - datafusion::error::DataFusionError::Execution( - "Cannot convert JSON object or array to float".to_string(), - ) - } else { - datafusion::error::DataFusionError::Execution(format!( - "Cannot convert string '{}' to float", - s - )) - } - }) -} - /// Create the json_get_bool UDF for getting a boolean value +/// +/// # Arguments +/// * First parameter: JSONB binary data (LargeBinary) +/// * Second parameter: Field name or array index as string (Utf8) +/// +/// # Returns +/// Boolean value with flexible type coercion (strings like 'true'/'yes'/'1' become true) pub fn json_get_bool_udf() -> ScalarUDF { create_udf( "json_get_bool", @@ -745,45 +562,19 @@ pub fn json_get_bool_udf() -> ScalarUDF { /// Implementation of json_get_bool function with ColumnarValue fn json_get_bool_columnar_impl(args: &[ColumnarValue]) -> Result { - let arrays: Vec = args - .iter() - .map(|arg| match arg { - ColumnarValue::Array(arr) => arr.clone(), - ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), - }) - .collect(); - + let arrays = common::columnar_to_arrays(args); let result = json_get_bool_impl(&arrays)?; Ok(ColumnarValue::Array(result)) } /// Implementation of json_get_bool function fn json_get_bool_impl(args: &[ArrayRef]) -> Result { - if args.len() != 2 { - return Err(datafusion::error::DataFusionError::Execution( - "json_get_bool requires exactly 2 arguments".to_string(), - )); - } + common::validate_arg_count(args, 2, "json_get_bool")?; - let jsonb_array = args[0] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "First argument must be LargeBinary".to_string(), - ) - })?; - - let key_array = args[1] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "Second argument must be String".to_string(), - ) - })?; - - let mut builder = BooleanBuilder::new(); + let jsonb_array = common::extract_jsonb_array(args)?; + let key_array = common::extract_string_array(args, 1)?; + + let mut builder = BooleanBuilder::with_capacity(jsonb_array.len()); for i in 0..jsonb_array.len() { if jsonb_array.is_null(i) || key_array.is_null(i) { @@ -791,16 +582,15 @@ fn json_get_bool_impl(args: &[ArrayRef]) -> Result { } else { let jsonb_bytes = jsonb_array.value(i); let key = key_array.value(i); - - match get_json_field_as_bool(jsonb_bytes, key) { - Ok(Some(value)) => builder.append_value(value), - Ok(None) => builder.append_null(), - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to get JSON bool: {}", - e - ))); - } + let key_type = common::KeyType::parse(key); + let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); + + match common::get_json_value_by_key(&raw_jsonb, &key_type)? { + Some(value) => match json_value_to_bool(value)? { + Some(bool_val) => builder.append_value(bool_val), + None => builder.append_null(), + }, + None => builder.append_null(), } } } @@ -808,85 +598,15 @@ fn json_get_bool_impl(args: &[ArrayRef]) -> Result { Ok(Arc::new(builder.finish())) } -/// Get a field value as boolean with type coercion -fn get_json_field_as_bool(jsonb_bytes: &[u8], key: &str) -> Result> { - let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); - - // Try as object field first - let value = match raw_jsonb.get_by_name(key, false) { - Ok(Some(value)) => value, - Ok(None) => { - // Try as array index - if let Ok(index) = key.parse::() { - match raw_jsonb.get_by_index(index) { - Ok(Some(value)) => value, - Ok(None) => return Ok(None), - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to get array element: {}", - e - ))) - } - } - } else { - return Ok(None); - } - } - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to get field: {}", - e - ))) - } - }; - - // Convert to string and parse - let json_str = value.to_string(); - - // Check for null - if json_str == "null" { - return Ok(None); - } - - // Direct boolean - if json_str == "true" { - return Ok(Some(true)); - } else if json_str == "false" { - return Ok(Some(false)); - } - - // String value (remove quotes and check) - let s = if json_str.starts_with('"') && json_str.ends_with('"') { - json_str[1..json_str.len() - 1].to_lowercase() - } else { - json_str.to_lowercase() - }; - - // String to bool conversion - match s.as_str() { - "true" | "1" | "yes" | "y" | "on" => Ok(Some(true)), - "false" | "0" | "no" | "n" | "off" => Ok(Some(false)), - _ => { - // Try as number - if let Ok(n) = s.parse::() { - Ok(Some(n != 0)) - } else if let Ok(f) = s.parse::() { - Ok(Some(f != 0.0)) - } else if s.starts_with('[') || s.starts_with('{') { - Err(datafusion::error::DataFusionError::Execution( - "Cannot convert JSON object or array to boolean".to_string(), - )) - } else { - Err(datafusion::error::DataFusionError::Execution(format!( - "Cannot convert string '{}' to boolean", - s - ))) - } - } - } -} - /// Create the json_array_contains UDF for checking if array contains a value +/// +/// # Arguments +/// * First parameter: JSONB binary data (LargeBinary) +/// * Second parameter: JSONPath to array location (Utf8) +/// * Third parameter: Value to search for as string (Utf8) +/// +/// # Returns +/// Boolean indicating whether the array contains the specified value pub fn json_array_contains_udf() -> ScalarUDF { create_udf( "json_array_contains", @@ -899,54 +619,20 @@ pub fn json_array_contains_udf() -> ScalarUDF { /// Implementation of json_array_contains function with ColumnarValue fn json_array_contains_columnar_impl(args: &[ColumnarValue]) -> Result { - let arrays: Vec = args - .iter() - .map(|arg| match arg { - ColumnarValue::Array(arr) => arr.clone(), - ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), - }) - .collect(); - + let arrays = common::columnar_to_arrays(args); let result = json_array_contains_impl(&arrays)?; Ok(ColumnarValue::Array(result)) } /// Implementation of json_array_contains function fn json_array_contains_impl(args: &[ArrayRef]) -> Result { - if args.len() != 3 { - return Err(datafusion::error::DataFusionError::Execution( - "json_array_contains requires exactly 3 arguments".to_string(), - )); - } + common::validate_arg_count(args, 3, "json_array_contains")?; - let jsonb_array = args[0] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "First argument must be LargeBinary".to_string(), - ) - })?; - - let path_array = args[1] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "Second argument must be String".to_string(), - ) - })?; - - let value_array = args[2] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "Third argument must be String".to_string(), - ) - })?; - - let mut builder = BooleanBuilder::new(); + let jsonb_array = common::extract_jsonb_array(args)?; + let path_array = common::extract_string_array(args, 1)?; + let value_array = common::extract_string_array(args, 2)?; + + let mut builder = BooleanBuilder::with_capacity(jsonb_array.len()); for i in 0..jsonb_array.len() { if jsonb_array.is_null(i) || path_array.is_null(i) || value_array.is_null(i) { @@ -956,15 +642,8 @@ fn json_array_contains_impl(args: &[ArrayRef]) -> Result { let path = path_array.value(i); let value = value_array.value(i); - match check_array_contains(jsonb_bytes, path, value) { - Ok(contains) => builder.append_value(contains), - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to check array contains: {}", - e - ))); - } - } + let contains = check_array_contains(jsonb_bytes, path, value)?; + builder.append_value(contains); } } @@ -973,9 +652,7 @@ fn json_array_contains_impl(args: &[ArrayRef]) -> Result { /// Check if a JSON array at path contains a value fn check_array_contains(jsonb_bytes: &[u8], path: &str, value: &str) -> Result { - let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes()).map_err(|e| { - datafusion::error::DataFusionError::Execution(format!("Invalid JSONPath: {}", e)) - })?; + let json_path = common::parse_json_path(path)?; let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb); @@ -1003,11 +680,21 @@ fn check_array_contains(jsonb_bytes: &[u8], path: &str, value: &str) -> Result Ok(false), + Err(e) => Err(common::execution_error(format!( + "Failed to check array contains at path '{}': {}", + path, e + ))), } } /// Create the json_array_length UDF for getting array length +/// +/// # Arguments +/// * First parameter: JSONB binary data (LargeBinary) +/// * Second parameter: JSONPath to array location (Utf8) +/// +/// # Returns +/// Integer length of the JSON array, or null if path doesn't point to an array pub fn json_array_length_udf() -> ScalarUDF { create_udf( "json_array_length", @@ -1020,45 +707,19 @@ pub fn json_array_length_udf() -> ScalarUDF { /// Implementation of json_array_length function with ColumnarValue fn json_array_length_columnar_impl(args: &[ColumnarValue]) -> Result { - let arrays: Vec = args - .iter() - .map(|arg| match arg { - ColumnarValue::Array(arr) => arr.clone(), - ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(), - }) - .collect(); - + let arrays = common::columnar_to_arrays(args); let result = json_array_length_impl(&arrays)?; Ok(ColumnarValue::Array(result)) } /// Implementation of json_array_length function fn json_array_length_impl(args: &[ArrayRef]) -> Result { - if args.len() != 2 { - return Err(datafusion::error::DataFusionError::Execution( - "json_array_length requires exactly 2 arguments".to_string(), - )); - } + common::validate_arg_count(args, 2, "json_array_length")?; + + let jsonb_array = common::extract_jsonb_array(args)?; + let path_array = common::extract_string_array(args, 1)?; - let jsonb_array = args[0] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "First argument must be LargeBinary".to_string(), - ) - })?; - - let path_array = args[1] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "Second argument must be String".to_string(), - ) - })?; - - let mut builder = Int64Builder::new(); + let mut builder = Int64Builder::with_capacity(jsonb_array.len()); for i in 0..jsonb_array.len() { if jsonb_array.is_null(i) || path_array.is_null(i) { @@ -1067,15 +728,9 @@ fn json_array_length_impl(args: &[ArrayRef]) -> Result { let jsonb_bytes = jsonb_array.value(i); let path = path_array.value(i); - match get_array_length(jsonb_bytes, path) { - Ok(Some(len)) => builder.append_value(len), - Ok(None) => builder.append_null(), - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Failed to get array length: {}", - e - ))); - } + match get_array_length(jsonb_bytes, path)? { + Some(len) => builder.append_value(len), + None => builder.append_null(), } } } @@ -1085,9 +740,7 @@ fn json_array_length_impl(args: &[ArrayRef]) -> Result { /// Get the length of a JSON array at path fn get_array_length(jsonb_bytes: &[u8], path: &str) -> Result> { - let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes()).map_err(|e| { - datafusion::error::DataFusionError::Execution(format!("Invalid JSONPath: {}", e)) - })?; + let json_path = common::parse_json_path(path)?; let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb); @@ -1108,8 +761,8 @@ fn get_array_length(jsonb_bytes: &[u8], path: &str) -> Result> { Err(_) => { // Not an array if count == 0 { - return Err(datafusion::error::DataFusionError::Execution(format!( - "Path does not point to an array: {}", + return Err(common::execution_error(format!( + "Path '{}' does not point to an array", path ))); } @@ -1119,7 +772,10 @@ fn get_array_length(jsonb_bytes: &[u8], path: &str) -> Result> { } Ok(Some(count as i64)) } - Err(_) => Ok(None), + Err(e) => Err(common::execution_error(format!( + "Failed to get array length at path '{}': {}", + path, e + ))), } } @@ -1192,44 +848,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_json_get_udf() -> Result<()> { - let json = r#"{"name": "Alice", "nested": {"value": 42}, "arr": [1, 2, 3]}"#; - let jsonb_bytes = create_test_jsonb(json); - - let mut binary_builder = LargeBinaryBuilder::new(); - binary_builder.append_value(&jsonb_bytes); - binary_builder.append_value(&jsonb_bytes); - binary_builder.append_value(&jsonb_bytes); - binary_builder.append_null(); - - let jsonb_array = Arc::new(binary_builder.finish()); - let key_array = Arc::new(StringArray::from(vec![ - Some("name"), - Some("nested"), - Some("not_exists"), - Some("any"), - ])); - - let result = json_get_impl(&[jsonb_array, key_array])?; - let binary_array = result.as_any().downcast_ref::().unwrap(); - - assert_eq!(binary_array.len(), 4); - assert!(!binary_array.is_null(0)); - assert!(!binary_array.is_null(1)); - assert!(binary_array.is_null(2)); - assert!(binary_array.is_null(3)); - - // Verify returned values are valid JSONB - let value0 = jsonb::RawJsonb::new(binary_array.value(0)); - assert_eq!(value0.to_string(), "\"Alice\""); - - let value1 = jsonb::RawJsonb::new(binary_array.value(1)); - assert!(value1.to_string().contains("\"value\":42")); - - Ok(()) - } - #[tokio::test] async fn test_json_get_string_udf() -> Result<()> { // Test valid string conversions @@ -1262,40 +880,19 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_json_get_string_error_on_object() -> Result<()> { - // Test that objects cannot be converted to string - let json = r#"{"obj": {}}"#; - let jsonb_bytes = create_test_jsonb(json); - - let mut binary_builder = LargeBinaryBuilder::new(); - binary_builder.append_value(&jsonb_bytes); - - let jsonb_array = Arc::new(binary_builder.finish()); - let key_array = Arc::new(StringArray::from(vec![Some("obj")])); - - let result = json_get_string_impl(&[jsonb_array, key_array]); - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("Cannot convert")); - - Ok(()) - } - #[tokio::test] async fn test_json_get_int_udf() -> Result<()> { - let json = r#"{"int": 42, "float": 3.14, "str_num": "99", "bool": true, "str": "abc"}"#; + let json = r#"{"int": 42, "str_num": "99", "bool": true}"#; let jsonb_bytes = create_test_jsonb(json); let mut binary_builder = LargeBinaryBuilder::new(); binary_builder.append_value(&jsonb_bytes); binary_builder.append_value(&jsonb_bytes); binary_builder.append_value(&jsonb_bytes); - binary_builder.append_value(&jsonb_bytes); let jsonb_array = Arc::new(binary_builder.finish()); let key_array = Arc::new(StringArray::from(vec![ Some("int"), - Some("float"), Some("str_num"), Some("bool"), ])); @@ -1303,32 +900,10 @@ mod tests { let result = json_get_int_impl(&[jsonb_array, key_array])?; let int_array = result.as_any().downcast_ref::().unwrap(); - assert_eq!(int_array.len(), 4); + assert_eq!(int_array.len(), 3); assert_eq!(int_array.value(0), 42); - assert_eq!(int_array.value(1), 3); // Truncated - assert_eq!(int_array.value(2), 99); - assert_eq!(int_array.value(3), 1); - - Ok(()) - } - - #[tokio::test] - async fn test_json_get_int_error() -> Result<()> { - let json = r#"{"str": "not_a_number"}"#; - let jsonb_bytes = create_test_jsonb(json); - - let mut binary_builder = LargeBinaryBuilder::new(); - binary_builder.append_value(&jsonb_bytes); - - let jsonb_array = Arc::new(binary_builder.finish()); - let key_array = Arc::new(StringArray::from(vec![Some("str")])); - - let result = json_get_int_impl(&[jsonb_array, key_array]); - assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("Cannot convert string")); + assert_eq!(int_array.value(1), 99); + assert_eq!(int_array.value(2), 1); // jsonb converts true to 1 Ok(()) } @@ -1336,7 +911,7 @@ mod tests { #[tokio::test] async fn test_json_get_bool_udf() -> Result<()> { let json = - r#"{"bool_true": true, "bool_false": false, "num": 1, "zero": 0, "str": "true"}"#; + r#"{"bool_true": true, "bool_false": false, "str_true": "true", "str_false": "false"}"#; let jsonb_bytes = create_test_jsonb(json); let mut binary_builder = LargeBinaryBuilder::new(); @@ -1344,26 +919,23 @@ mod tests { binary_builder.append_value(&jsonb_bytes); binary_builder.append_value(&jsonb_bytes); binary_builder.append_value(&jsonb_bytes); - binary_builder.append_value(&jsonb_bytes); let jsonb_array = Arc::new(binary_builder.finish()); let key_array = Arc::new(StringArray::from(vec![ Some("bool_true"), Some("bool_false"), - Some("num"), - Some("zero"), - Some("str"), + Some("str_true"), + Some("str_false"), ])); let result = json_get_bool_impl(&[jsonb_array, key_array])?; let bool_array = result.as_any().downcast_ref::().unwrap(); - assert_eq!(bool_array.len(), 5); + assert_eq!(bool_array.len(), 4); assert!(bool_array.value(0)); assert!(!bool_array.value(1)); - assert!(bool_array.value(2)); - assert!(!bool_array.value(3)); - assert!(bool_array.value(4)); + assert!(bool_array.value(2)); // "true" string converts to true + assert!(!bool_array.value(3)); // "false" string converts to false Ok(()) } From f6759ed6add58bd95d0b5ec82016155e30a1d539 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 28 Aug 2025 17:18:37 +0800 Subject: [PATCH 03/11] Fix python lint Signed-off-by: Xuanwo --- python/python/tests/test_json.py | 108 ++++++++++++++++++------------- 1 file changed, 64 insertions(+), 44 deletions(-) diff --git a/python/python/tests/test_json.py b/python/python/tests/test_json.py index ef9d60b8c09..3cb88760eb8 100644 --- a/python/python/tests/test_json.py +++ b/python/python/tests/test_json.py @@ -195,32 +195,38 @@ def test_json_path_queries(): {"user": {"name": "Charlie"}, "tags": []}, None, ] - + json_strings = [json.dumps(d) if d is not None else None for d in json_data] json_arr = pa.array(json_strings, type=pa.json_()) - + # Create a Lance dataset with JSON data - table = pa.table({ - "id": [1, 2, 3, 4], - "data": json_arr, - }) - + table = pa.table( + { + "id": [1, 2, 3, 4], + "data": json_arr, + } + ) + with tempfile.TemporaryDirectory() as tmpdir: ds_path = Path(tmpdir) / "json_test.lance" lance.write_dataset(table, ds_path, data_storage_version="2.2") dataset = lance.dataset(ds_path) - + # Test json_extract - result = dataset.to_table(filter="json_extract(data, '$.user.name') = '\"Alice\"'") + result = dataset.to_table( + filter="json_extract(data, '$.user.name') = '\"Alice\"'" + ) assert result.num_rows == 1 assert result["id"][0].as_py() == 1 - + # Test json_exists result = dataset.to_table(filter="json_exists(data, '$.user.age')") assert result.num_rows == 2 # Alice and Bob have age field - + # Test json_array_contains - result = dataset.to_table(filter="json_array_contains(data, '$.tags', 'python')") + result = dataset.to_table( + filter="json_array_contains(data, '$.tags', 'python')" + ) assert result.num_rows == 1 assert result["id"][0].as_py() == 1 @@ -234,33 +240,35 @@ def test_json_get_functions(): {"name": "Charlie", "age": "35", "active": "true", "score": "92"}, {"name": "David"}, # Missing fields ] - + json_strings = [json.dumps(d) for d in json_data] json_arr = pa.array(json_strings, type=pa.json_()) - - table = pa.table({ - "id": [1, 2, 3, 4], - "data": json_arr, - }) - + + table = pa.table( + { + "id": [1, 2, 3, 4], + "data": json_arr, + } + ) + with tempfile.TemporaryDirectory() as tmpdir: ds_path = Path(tmpdir) / "json_get_test.lance" lance.write_dataset(table, ds_path, data_storage_version="2.2") dataset = lance.dataset(ds_path) - + # Test json_get_string result = dataset.to_table(filter="json_get_string(data, 'name') = 'Alice'") assert result.num_rows == 1 assert result["id"][0].as_py() == 1 - + # Test json_get_int with type coercion result = dataset.to_table(filter="json_get_int(data, 'age') > 28") assert result.num_rows == 2 # Alice (30) and Charlie ("35" -> 35) - + # Test json_get_bool with type coercion result = dataset.to_table(filter="json_get_bool(data, 'active') = true") assert result.num_rows == 2 # Alice (true) and Charlie ("true" -> true) - + # Test json_get_float result = dataset.to_table(filter="json_get_float(data, 'score') > 90") assert result.num_rows == 2 # Alice (95.5) and Charlie ("92" -> 92.0) @@ -272,28 +280,36 @@ def test_nested_json_access(): {"user": {"profile": {"name": "Alice", "settings": {"theme": "dark"}}}}, {"user": {"profile": {"name": "Bob", "settings": {"theme": "light"}}}}, ] - + json_strings = [json.dumps(d) for d in json_data] json_arr = pa.array(json_strings, type=pa.json_()) - - table = pa.table({ - "id": [1, 2], - "data": json_arr, - }) - + + table = pa.table( + { + "id": [1, 2], + "data": json_arr, + } + ) + with tempfile.TemporaryDirectory() as tmpdir: ds_path = Path(tmpdir) / "nested_json_test.lance" lance.write_dataset(table, ds_path, data_storage_version="2.2") dataset = lance.dataset(ds_path) - + # Access nested fields using json_get recursively # First get user, then profile, then name result = dataset.to_table( - filter="json_get_string(json_get(json_get(data, 'user'), 'profile'), 'name') = 'Alice'" + filter=""" + json_get_string( + json_get( + json_get(data, 'user'), + 'profile'), + 'name') + = 'Alice'""" ) assert result.num_rows == 1 assert result["id"][0].as_py() == 1 - + # Or use JSONPath for deep access result = dataset.to_table( filter="json_extract(data, '$.user.profile.settings.theme') = '\"dark\"'" @@ -309,30 +325,34 @@ def test_json_array_operations(): {"items": ["grape", "melon"], "counts": [10, 20]}, {"items": [], "counts": []}, ] - + json_strings = [json.dumps(d) for d in json_data] json_arr = pa.array(json_strings, type=pa.json_()) - - table = pa.table({ - "id": [1, 2, 3], - "data": json_arr, - }) - + + table = pa.table( + { + "id": [1, 2, 3], + "data": json_arr, + } + ) + with tempfile.TemporaryDirectory() as tmpdir: ds_path = Path(tmpdir) / "array_json_test.lance" lance.write_dataset(table, ds_path, data_storage_version="2.2") dataset = lance.dataset(ds_path) - + # Test array contains - result = dataset.to_table(filter="json_array_contains(data, '$.items', 'apple')") + result = dataset.to_table( + filter="json_array_contains(data, '$.items', 'apple')" + ) assert result.num_rows == 1 assert result["id"][0].as_py() == 1 - + # Test array length result = dataset.to_table(filter="json_array_length(data, '$.counts') > 3") assert result.num_rows == 1 assert result["id"][0].as_py() == 1 - + # Test empty array result = dataset.to_table(filter="json_array_length(data, '$.items') = 0") assert result.num_rows == 1 From d8045a2e17b42cca2f9bdbb86dfcc1fa7bf450e5 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 28 Aug 2025 17:45:33 +0800 Subject: [PATCH 04/11] Fix UDF register Signed-off-by: Xuanwo --- rust/lance-datafusion/src/planner.rs | 14 +++++++++----- rust/lance-datafusion/src/udf.rs | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/rust/lance-datafusion/src/planner.rs b/rust/lance-datafusion/src/planner.rs index a654a8d653f..f231948f1e4 100644 --- a/rust/lance-datafusion/src/planner.rs +++ b/rust/lance-datafusion/src/planner.rs @@ -20,7 +20,7 @@ use datafusion::common::DFSchema; use datafusion::config::ConfigOptions; use datafusion::error::Result as DFResult; use datafusion::execution::config::SessionConfig; -use datafusion::execution::context::SessionState; +use datafusion::execution::context::{SessionContext, SessionState}; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::logical_expr::expr::ScalarFunction; @@ -162,20 +162,24 @@ impl Default for LanceContextProvider { fn default() -> Self { let config = SessionConfig::new(); let runtime = RuntimeEnvBuilder::new().build_arc().unwrap(); + + let ctx = SessionContext::new_with_config_rt(config.clone(), runtime.clone()); + crate::udf::register_functions(&ctx); + + let state = ctx.state(); + + // SessionState does not expose expr_planners, so we need to get them separately let mut state_builder = SessionStateBuilder::new() .with_config(config) .with_runtime_env(runtime) .with_default_features(); - // SessionState does not expose expr_planners, so we need to get the default ones from - // the builder and store them to return from get_expr_planners - // unwrap safe because with_default_features sets expr_planners let expr_planners = state_builder.expr_planners().as_ref().unwrap().clone(); Self { options: ConfigOptions::default(), - state: state_builder.build(), + state, expr_planners, } } diff --git a/rust/lance-datafusion/src/udf.rs b/rust/lance-datafusion/src/udf.rs index f19e59de487..c5073756a15 100644 --- a/rust/lance-datafusion/src/udf.rs +++ b/rust/lance-datafusion/src/udf.rs @@ -10,7 +10,7 @@ use datafusion::prelude::SessionContext; use datafusion_functions::utils::make_scalar_function; use std::sync::{Arc, LazyLock}; -mod json; +pub(crate) mod json; /// Register UDF functions to datafusion context. pub fn register_functions(ctx: &SessionContext) { From 69ecb54615a41b20dd10e9ba8c1930ef8e6cc461 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 29 Aug 2025 01:47:43 +0800 Subject: [PATCH 05/11] Correct tests Signed-off-by: Xuanwo --- python/python/tests/test_json.py | 34 +++++++++++++++----------------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/python/python/tests/test_json.py b/python/python/tests/test_json.py index 3cb88760eb8..6ac046842ee 100644 --- a/python/python/tests/test_json.py +++ b/python/python/tests/test_json.py @@ -40,35 +40,33 @@ def test_json_basic_write_read(): # Read back the dataset dataset = lance.dataset(dataset_path) - # Verify schema + # Verify storage schema assert len(dataset.schema) == 2 assert dataset.schema.field("id").type == pa.int32() - # Check that JSON field is properly recognized - json_field_schema = dataset.schema.field("data") - # Should be PyArrow JSON type - assert json_field_schema.type == pa.json_() + # Check that JSON field is stored as JSONB internally + storage_field = dataset.schema.field("data") + assert storage_field.type == pa.large_binary() + assert storage_field.metadata is not None + assert b"ARROW:extension:name" in storage_field.metadata + assert storage_field.metadata[b"ARROW:extension:name"] == b"lance.json" # Read data back result_table = dataset.to_table() + # Check that data is returned as Arrow JSON for Python + result_field = result_table.schema.field("data") + # PyArrow extension types print as extension but + # the storage type is utf8 + assert ( + str(result_field.type) == "extension" + or result_field.type == pa.utf8() + ) + # Verify data assert result_table.num_rows == 5 assert result_table.column("id").to_pylist() == [1, 2, 3, 4, 5] - # Verify the data column is JSON type - data_column = result_table.column("data") - assert data_column.type == pa.json_() - - # Verify the JSON data is correctly preserved - for i, expected in enumerate(json_strings): - actual = data_column[i].as_py() - if expected is None: - assert actual is None - else: - # Arrow JSON returns strings, verify they match - assert actual == expected - def test_json_with_other_types(): """Test JSON type alongside other data types.""" From c2c71e0dbcfdfda88c0788c6a883479d648a35de Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 29 Aug 2025 02:09:55 +0800 Subject: [PATCH 06/11] Fix JSONB convert Signed-off-by: Xuanwo --- rust/lance-arrow/src/json.rs | 218 ++++++++++++++++++++++++- rust/lance-core/src/datatypes/field.rs | 9 +- rust/lance-datafusion/src/udf/json.rs | 99 ++++++----- rust/lance/src/dataset/scanner.rs | 7 + rust/lance/src/dataset/utils.rs | 119 +++++++++++++- rust/lance/src/dataset/write.rs | 25 ++- 6 files changed, 425 insertions(+), 52 deletions(-) diff --git a/rust/lance-arrow/src/json.rs b/rust/lance-arrow/src/json.rs index 226d48a0275..2b5977f8cf8 100644 --- a/rust/lance-arrow/src/json.rs +++ b/rust/lance-arrow/src/json.rs @@ -7,15 +7,18 @@ use std::convert::TryFrom; use std::sync::Arc; use arrow_array::builder::LargeBinaryBuilder; -use arrow_array::{Array, ArrayRef, LargeBinaryArray, LargeStringArray, StringArray}; +use arrow_array::{Array, ArrayRef, LargeBinaryArray, LargeStringArray, RecordBatch, StringArray}; use arrow_data::ArrayData; -use arrow_schema::{ArrowError, DataType, Field as ArrowField}; +use arrow_schema::{ArrowError, DataType, Field as ArrowField, Schema}; use crate::ARROW_EXT_NAME_KEY; -/// Arrow extension type name for JSON data +/// Arrow extension type name for JSON data (Lance internal) pub const JSON_EXT_NAME: &str = "lance.json"; +/// Arrow extension type name for JSON data (Arrow official) +pub const ARROW_JSON_EXT_NAME: &str = "arrow.json"; + /// Check if a field is a JSON extension field (Lance internal JSONB storage) pub fn is_json_field(field: &ArrowField) -> bool { field.data_type() == &DataType::LargeBinary @@ -26,6 +29,17 @@ pub fn is_json_field(field: &ArrowField) -> bool { .unwrap_or_default() } +/// Check if a field is an Arrow JSON extension field (PyArrow pa.json() type) +pub fn is_arrow_json_field(field: &ArrowField) -> bool { + // Arrow JSON extension type uses Utf8 or LargeUtf8 as storage type + (field.data_type() == &DataType::Utf8 || field.data_type() == &DataType::LargeUtf8) + && field + .metadata() + .get(ARROW_EXT_NAME_KEY) + .map(|name| name == ARROW_JSON_EXT_NAME) + .unwrap_or_default() +} + /// Check if a field or any of its descendants is a JSON field pub fn has_json_fields(field: &ArrowField) -> bool { if is_json_field(field) { @@ -323,6 +337,156 @@ fn get_json_path( } } +/// Convert an Arrow JSON field to Lance JSON field (with JSONB storage) +pub fn arrow_json_to_lance_json(field: &ArrowField) -> ArrowField { + if is_arrow_json_field(field) { + // Convert Arrow JSON (Utf8/LargeUtf8) to Lance JSON (LargeBinary) + json_field(field.name(), field.is_nullable()) + } else { + field.clone() + } +} + +/// Convert a RecordBatch with Lance JSON columns (JSONB) back to Arrow JSON format (strings) +pub fn convert_lance_json_to_arrow( + batch: &arrow_array::RecordBatch, +) -> Result { + let schema = batch.schema(); + let mut needs_conversion = false; + let mut new_fields = Vec::with_capacity(schema.fields().len()); + let mut new_columns = Vec::with_capacity(batch.num_columns()); + + for (i, field) in schema.fields().iter().enumerate() { + let column = batch.column(i); + + if is_json_field(field) { + needs_conversion = true; + + // Convert the field back to Arrow JSON (Utf8) + let mut new_field = ArrowField::new(field.name(), DataType::Utf8, field.is_nullable()); + let mut metadata = field.metadata().clone(); + // Change from lance.json to arrow.json + metadata.insert( + ARROW_EXT_NAME_KEY.to_string(), + ARROW_JSON_EXT_NAME.to_string(), + ); + new_field.set_metadata(metadata); + new_fields.push(new_field); + + // Convert the data from JSONB to JSON strings + if batch.num_rows() == 0 { + // For empty batches, create an empty String array + let empty_strings = arrow_array::builder::StringBuilder::new().finish(); + new_columns.push(Arc::new(empty_strings) as ArrayRef); + } else { + // Convert JSONB back to JSON strings + let binary_array = column + .as_any() + .downcast_ref::() + .ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "Lance JSON field '{}' has unexpected type", + field.name() + )) + })?; + + let mut builder = arrow_array::builder::StringBuilder::new(); + for i in 0..binary_array.len() { + if binary_array.is_null(i) { + builder.append_null(); + } else { + let jsonb_bytes = binary_array.value(i); + let json_str = decode_json(jsonb_bytes).map_err(|e| { + ArrowError::InvalidArgumentError(format!( + "Failed to decode JSON: {}", + e + )) + })?; + builder.append_value(&json_str); + } + } + new_columns.push(Arc::new(builder.finish()) as ArrayRef); + } + } else { + new_fields.push(field.as_ref().clone()); + new_columns.push(column.clone()); + } + } + + if needs_conversion { + let new_schema = Arc::new(Schema::new_with_metadata( + new_fields, + schema.metadata().clone(), + )); + RecordBatch::try_new(new_schema, new_columns) + } else { + // No conversion needed, return original batch + Ok(batch.clone()) + } +} + +/// Convert a RecordBatch with Arrow JSON columns to Lance JSON format (JSONB) +pub fn convert_json_columns( + batch: &arrow_array::RecordBatch, +) -> Result { + let schema = batch.schema(); + let mut needs_conversion = false; + let mut new_fields = Vec::with_capacity(schema.fields().len()); + let mut new_columns = Vec::with_capacity(batch.num_columns()); + + for (i, field) in schema.fields().iter().enumerate() { + let column = batch.column(i); + + if is_arrow_json_field(field) { + needs_conversion = true; + + // Convert the field metadata + new_fields.push(arrow_json_to_lance_json(field)); + + // Convert the data from JSON strings to JSONB + if batch.num_rows() == 0 { + // For empty batches, create an empty LargeBinary array + let empty_binary = LargeBinaryBuilder::new().finish(); + new_columns.push(Arc::new(empty_binary) as ArrayRef); + } else { + // Convert non-empty data + let json_array = + if let Some(string_array) = column.as_any().downcast_ref::() { + JsonArray::try_from(string_array)? + } else if let Some(large_string_array) = + column.as_any().downcast_ref::() + { + JsonArray::try_from(large_string_array)? + } else { + return Err(ArrowError::InvalidArgumentError(format!( + "Arrow JSON field '{}' has unexpected storage type: {:?}", + field.name(), + column.data_type() + ))); + }; + + let binary_array = json_array.into_inner(); + + new_columns.push(Arc::new(binary_array) as ArrayRef); + } + } else { + new_fields.push(field.as_ref().clone()); + new_columns.push(column.clone()); + } + } + + if needs_conversion { + let new_schema = Arc::new(Schema::new_with_metadata( + new_fields, + schema.metadata().clone(), + )); + RecordBatch::try_new(new_schema, new_columns) + } else { + // No conversion needed, return original batch + Ok(batch.clone()) + } +} + #[cfg(test)] mod tests { use super::*; @@ -383,4 +547,52 @@ mod tests { let age = json_array.json_path(1, "$.user.age").unwrap(); assert_eq!(age, None); } + + #[test] + fn test_convert_json_columns() { + // Create a batch with Arrow JSON column + let json_strings = vec![Some(r#"{"name": "Alice"}"#), Some(r#"{"name": "Bob"}"#)]; + let json_arr = StringArray::from(json_strings); + + // Create field with arrow.json extension + let mut field = ArrowField::new("data", DataType::Utf8, false); + let mut metadata = std::collections::HashMap::new(); + metadata.insert( + ARROW_EXT_NAME_KEY.to_string(), + ARROW_JSON_EXT_NAME.to_string(), + ); + field.set_metadata(metadata); + + let schema = Arc::new(Schema::new(vec![field])); + let batch = RecordBatch::try_new(schema, vec![Arc::new(json_arr) as ArrayRef]).unwrap(); + + // Convert the batch + let converted = convert_json_columns(&batch).unwrap(); + + // Check the converted schema + assert_eq!(converted.num_columns(), 1); + let converted_schema = converted.schema(); + let converted_field = converted_schema.field(0); + assert_eq!(converted_field.data_type(), &DataType::LargeBinary); + assert_eq!( + converted_field.metadata().get(ARROW_EXT_NAME_KEY), + Some(&JSON_EXT_NAME.to_string()) + ); + + // Check the data was converted + let converted_column = converted.column(0); + assert_eq!(converted_column.data_type(), &DataType::LargeBinary); + assert_eq!(converted_column.len(), 2); + + // Verify the data is valid JSONB + let binary_array = converted_column + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..binary_array.len() { + let jsonb_bytes = binary_array.value(i); + let decoded = decode_json(jsonb_bytes).unwrap(); + assert!(decoded.contains("name")); + } + } } diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 1d236dfd313..0c0b635b9ad 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -20,7 +20,10 @@ use arrow_array::{ }; use arrow_schema::{DataType, Field as ArrowField}; use deepsize::DeepSizeOf; -use lance_arrow::{json::is_json_field, ARROW_EXT_NAME_KEY, *}; +use lance_arrow::{ + json::{is_arrow_json_field, is_json_field}, + ARROW_EXT_NAME_KEY, *, +}; use snafu::location; use super::{ @@ -988,8 +991,8 @@ impl TryFrom<&ArrowField> for Field { .map(|s| matches!(s.to_lowercase().as_str(), "true" | "1" | "yes")) .unwrap_or(false); - // Check for JSON extension type - let logical_type = if is_json_field(field) { + // Check for JSON extension types (both Arrow and Lance) + let logical_type = if is_arrow_json_field(field) || is_json_field(field) { LogicalType::from("json") } else { LogicalType::try_from(field.data_type())? diff --git a/rust/lance-datafusion/src/udf/json.rs b/rust/lance-datafusion/src/udf/json.rs index eaf50a7ba99..5257457fcc2 100644 --- a/rust/lance-datafusion/src/udf/json.rs +++ b/rust/lance-datafusion/src/udf/json.rs @@ -84,6 +84,20 @@ mod common { .ok_or_else(|| execution_error(format!("Argument {} must be String", arg_index + 1))) } + /// Get string value at index, handling scalar broadcast case + /// When a scalar is converted to an array, it becomes a single-element array + /// This function handles accessing that value repeatedly for all rows + pub fn get_string_value_at(string_array: &StringArray, index: usize) -> Option<&str> { + // Handle scalar broadcast case: if array has only 1 element, always use index 0 + let actual_index = if string_array.len() == 1 { 0 } else { index }; + + if string_array.is_null(actual_index) { + None + } else { + Some(string_array.value(actual_index)) + } + } + /// Get JSON field/element using pre-parsed key type (avoids repeated parsing) pub fn get_json_value_by_key( raw_jsonb: &jsonb::RawJsonb, @@ -182,7 +196,7 @@ fn json_value_to_bool(value: jsonb::OwnedJsonb) -> Result> { .map_err(|e| common::execution_error(format!("Failed to convert to boolean: {}", e))) } -/// Create the json_extract UDF for extracting JSONPath from JSONB data +/// Create the json_extract UDF for extracting JSONPath from JSON data /// /// # Arguments /// * First parameter: JSONB binary data (LargeBinary) @@ -213,20 +227,19 @@ fn json_extract_impl(args: &[ArrayRef]) -> Result { let jsonb_array = common::extract_jsonb_array(args)?; let path_array = common::extract_string_array(args, 1)?; - let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024); for i in 0..jsonb_array.len() { - if jsonb_array.is_null(i) || path_array.is_null(i) { + if jsonb_array.is_null(i) { builder.append_null(); - } else { + } else if let Some(path) = common::get_string_value_at(path_array, i) { let jsonb_bytes = jsonb_array.value(i); - let path = path_array.value(i); - match extract_json_path(jsonb_bytes, path)? { Some(value) => builder.append_value(&value), None => builder.append_null(), } + } else { + builder.append_null(); } } @@ -292,14 +305,16 @@ fn json_exists_impl(args: &[ArrayRef]) -> Result { let mut builder = BooleanBuilder::with_capacity(jsonb_array.len()); for i in 0..jsonb_array.len() { - if jsonb_array.is_null(i) || path_array.is_null(i) { - builder.append_null(); - } else { + if jsonb_array.is_null(i) { + // For null JSON values, return false instead of null + // This allows filters like "json_exists(data, '$.field') = false" to match null JSON + builder.append_value(false); + } else if let Some(path) = common::get_string_value_at(path_array, i) { let jsonb_bytes = jsonb_array.value(i); - let path = path_array.value(i); - let exists = check_json_path_exists(jsonb_bytes, path)?; builder.append_value(exists); + } else { + builder.append_null(); } } @@ -356,11 +371,10 @@ fn json_get_impl(args: &[ArrayRef]) -> Result { let mut builder = LargeBinaryBuilder::with_capacity(jsonb_array.len(), 0); for i in 0..jsonb_array.len() { - if jsonb_array.is_null(i) || key_array.is_null(i) { + if jsonb_array.is_null(i) { builder.append_null(); - } else { + } else if let Some(key) = common::get_string_value_at(key_array, i) { let jsonb_bytes = jsonb_array.value(i); - let key = key_array.value(i); let key_type = common::KeyType::parse(key); let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); @@ -368,6 +382,8 @@ fn json_get_impl(args: &[ArrayRef]) -> Result { Some(value) => builder.append_value(value.as_raw().as_ref()), None => builder.append_null(), } + } else { + builder.append_null(); } } @@ -409,11 +425,10 @@ fn json_get_string_impl(args: &[ArrayRef]) -> Result { let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024); for i in 0..jsonb_array.len() { - if jsonb_array.is_null(i) || key_array.is_null(i) { + if jsonb_array.is_null(i) { builder.append_null(); - } else { + } else if let Some(key) = common::get_string_value_at(key_array, i) { let jsonb_bytes = jsonb_array.value(i); - let key = key_array.value(i); let key_type = common::KeyType::parse(key); let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); @@ -424,6 +439,8 @@ fn json_get_string_impl(args: &[ArrayRef]) -> Result { }, None => builder.append_null(), } + } else { + builder.append_null(); } } @@ -465,11 +482,10 @@ fn json_get_int_impl(args: &[ArrayRef]) -> Result { let mut builder = Int64Builder::with_capacity(jsonb_array.len()); for i in 0..jsonb_array.len() { - if jsonb_array.is_null(i) || key_array.is_null(i) { + if jsonb_array.is_null(i) { builder.append_null(); - } else { + } else if let Some(key) = common::get_string_value_at(key_array, i) { let jsonb_bytes = jsonb_array.value(i); - let key = key_array.value(i); let key_type = common::KeyType::parse(key); let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); @@ -480,6 +496,8 @@ fn json_get_int_impl(args: &[ArrayRef]) -> Result { }, None => builder.append_null(), } + } else { + builder.append_null(); } } @@ -521,11 +539,10 @@ fn json_get_float_impl(args: &[ArrayRef]) -> Result { let mut builder = Float64Builder::with_capacity(jsonb_array.len()); for i in 0..jsonb_array.len() { - if jsonb_array.is_null(i) || key_array.is_null(i) { + if jsonb_array.is_null(i) { builder.append_null(); - } else { + } else if let Some(key) = common::get_string_value_at(key_array, i) { let jsonb_bytes = jsonb_array.value(i); - let key = key_array.value(i); let key_type = common::KeyType::parse(key); let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); @@ -536,6 +553,8 @@ fn json_get_float_impl(args: &[ArrayRef]) -> Result { }, None => builder.append_null(), } + } else { + builder.append_null(); } } @@ -577,11 +596,10 @@ fn json_get_bool_impl(args: &[ArrayRef]) -> Result { let mut builder = BooleanBuilder::with_capacity(jsonb_array.len()); for i in 0..jsonb_array.len() { - if jsonb_array.is_null(i) || key_array.is_null(i) { + if jsonb_array.is_null(i) { builder.append_null(); - } else { + } else if let Some(key) = common::get_string_value_at(key_array, i) { let jsonb_bytes = jsonb_array.value(i); - let key = key_array.value(i); let key_type = common::KeyType::parse(key); let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes); @@ -592,6 +610,8 @@ fn json_get_bool_impl(args: &[ArrayRef]) -> Result { }, None => builder.append_null(), } + } else { + builder.append_null(); } } @@ -635,15 +655,20 @@ fn json_array_contains_impl(args: &[ArrayRef]) -> Result { let mut builder = BooleanBuilder::with_capacity(jsonb_array.len()); for i in 0..jsonb_array.len() { - if jsonb_array.is_null(i) || path_array.is_null(i) || value_array.is_null(i) { + if jsonb_array.is_null(i) { builder.append_null(); } else { - let jsonb_bytes = jsonb_array.value(i); - let path = path_array.value(i); - let value = value_array.value(i); - - let contains = check_array_contains(jsonb_bytes, path, value)?; - builder.append_value(contains); + let path = common::get_string_value_at(path_array, i); + let value = common::get_string_value_at(value_array, i); + + match (path, value) { + (Some(p), Some(v)) => { + let jsonb_bytes = jsonb_array.value(i); + let contains = check_array_contains(jsonb_bytes, p, v)?; + builder.append_value(contains); + } + _ => builder.append_null(), + } } } @@ -722,16 +747,16 @@ fn json_array_length_impl(args: &[ArrayRef]) -> Result { let mut builder = Int64Builder::with_capacity(jsonb_array.len()); for i in 0..jsonb_array.len() { - if jsonb_array.is_null(i) || path_array.is_null(i) { + if jsonb_array.is_null(i) { builder.append_null(); - } else { + } else if let Some(path) = common::get_string_value_at(path_array, i) { let jsonb_bytes = jsonb_array.value(i); - let path = path_array.value(i); - match get_array_length(jsonb_bytes, path)? { Some(len) => builder.append_value(len), None => builder.append_null(), } + } else { + builder.append_null(); } } diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index f06acffa73e..1c9ee6d721c 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -71,6 +71,7 @@ use tracing::{info_span, instrument, Span}; use super::Dataset; use crate::dataset::row_offsets_to_row_addresses; +use crate::dataset::utils::wrap_json_stream_for_reading; use crate::index::scalar::detect_scalar_index_type; use crate::index::vector::utils::{get_vector_dim, get_vector_type}; use crate::index::DatasetIndexInternalExt; @@ -85,6 +86,7 @@ use crate::io::exec::{ }; use crate::{datatypes::Schema, io::exec::fts::BooleanQueryExec}; use crate::{Error, Result}; + use snafu::location; pub use lance_datafusion::exec::{ExecutionStatsCallback, ExecutionSummaryCounts}; @@ -3460,6 +3462,11 @@ pub struct DatasetRecordBatchStream { impl DatasetRecordBatchStream { pub fn new(exec_node: SendableRecordBatchStream) -> Self { + // Convert lance.json (JSONB) back to arrow.json (strings) for reading + // + // This is so bad, we need to find a way to remove this. + let exec_node = wrap_json_stream_for_reading(exec_node); + let span = info_span!("DatasetRecordBatchStream"); Self { exec_node, span } } diff --git a/rust/lance/src/dataset/utils.rs b/rust/lance/src/dataset/utils.rs index 4595d439683..f9241e9295d 100644 --- a/rust/lance/src/dataset/utils.rs +++ b/rust/lance/src/dataset/utils.rs @@ -1,20 +1,24 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use crate::Result; use arrow_array::{RecordBatch, UInt64Array}; +use arrow_schema::Schema as ArrowSchema; use datafusion::error::Result as DFResult; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::SendableRecordBatchStream; use futures::StreamExt; +use lance_arrow::json::{ + arrow_json_to_lance_json, convert_json_columns, convert_lance_json_to_arrow, + is_arrow_json_field, is_json_field, +}; +use lance_core::ROW_ID; +use lance_table::rowids::{RowIdIndex, RowIdSequence}; use roaring::RoaringTreemap; use std::borrow::Cow; use std::sync::mpsc::Receiver; use std::sync::Arc; -use crate::Result; -use lance_core::ROW_ID; -use lance_table::rowids::{RowIdIndex, RowIdSequence}; - fn extract_row_ids( row_ids: &mut CapturedRowIds, batch: RecordBatch, @@ -133,3 +137,110 @@ impl Default for CapturedRowIds { Self::AddressStyle(RoaringTreemap::new()) } } + +/// Wrap a stream to convert arrow.json to lance.json for writing +/// +// FIXME: this is bad, really bad, we need to find a way to remove this. +pub fn wrap_json_stream_for_writing( + stream: SendableRecordBatchStream, +) -> SendableRecordBatchStream { + // Check if any fields need conversion + let needs_conversion = stream + .schema() + .fields() + .iter() + .any(|f| is_arrow_json_field(f)); + + if !needs_conversion { + return stream; + } + + // Convert the schema + let arrow_schema = stream.schema(); + let mut new_fields = Vec::with_capacity(arrow_schema.fields().len()); + for field in arrow_schema.fields() { + if is_arrow_json_field(field) { + new_fields.push(arrow_json_to_lance_json(field)); + } else { + new_fields.push(field.as_ref().clone()); + } + } + let converted_schema = Arc::new(ArrowSchema::new_with_metadata( + new_fields, + arrow_schema.metadata().clone(), + )); + + // Convert the stream + let converted_stream = stream.map(move |batch_result| { + batch_result.and_then(|batch| { + convert_json_columns(&batch) + .map_err(|e| datafusion::error::DataFusionError::ArrowError(e, None)) + }) + }); + + Box::pin(RecordBatchStreamAdapter::new( + converted_schema, + converted_stream, + )) +} + +/// Wrap a stream to convert lance.json (JSONB) back to arrow.json (strings) for reading +/// +// FIXME: this is bad, really bad, we need to find a way to remove this. +pub fn wrap_json_stream_for_reading( + stream: SendableRecordBatchStream, +) -> SendableRecordBatchStream { + use lance_arrow::json::ARROW_JSON_EXT_NAME; + use lance_arrow::ARROW_EXT_NAME_KEY; + + // Check if any fields need conversion + let needs_conversion = stream.schema().fields().iter().any(|f| is_json_field(f)); + + if !needs_conversion { + return stream; + } + + // Convert the schema + let arrow_schema = stream.schema(); + let mut new_fields = Vec::with_capacity(arrow_schema.fields().len()); + for field in arrow_schema.fields() { + if is_json_field(field) { + // Convert lance.json (LargeBinary) to arrow.json (Utf8) + let mut new_field = arrow_schema::Field::new( + field.name(), + arrow_schema::DataType::Utf8, + field.is_nullable(), + ); + let mut metadata = field.metadata().clone(); + metadata.insert( + ARROW_EXT_NAME_KEY.to_string(), + ARROW_JSON_EXT_NAME.to_string(), + ); + new_field.set_metadata(metadata); + new_fields.push(new_field); + } else { + new_fields.push(field.as_ref().clone()); + } + } + let converted_schema = Arc::new(ArrowSchema::new_with_metadata( + new_fields, + arrow_schema.metadata().clone(), + )); + + // Convert the stream + let converted_stream = stream.map(move |batch_result| { + batch_result.and_then(|batch| { + convert_lance_json_to_arrow(&batch).map_err(|e| { + datafusion::error::DataFusionError::ArrowError( + arrow_schema::ArrowError::InvalidArgumentError(e.to_string()), + None, + ) + }) + }) + }); + + Box::pin(RecordBatchStreamAdapter::new( + converted_schema, + converted_stream, + )) +} diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 22a6ceeb9a7..0a9ecf45539 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -38,6 +38,7 @@ use crate::Dataset; use super::blob::BlobStreamExt; use super::progress::{NoopFragmentWriteProgress, WriteFragmentProgress}; use super::transaction::Transaction; +use super::utils::wrap_json_stream_for_writing; use super::DATA_DIR; mod commit; @@ -305,6 +306,11 @@ pub async fn do_write_fragments( params: WriteParams, storage_version: LanceFileVersion, ) -> Result> { + // Convert arrow.json to lance.json (JSONB) for storage if needed + // + // FIXME: this is bad, really bad, we need to find a way to remove this. + let data = wrap_json_stream_for_writing(data); + let mut buffered_reader = if storage_version == LanceFileVersion::Legacy { // In v1 we split the stream into row group sized batches chunk_stream(data, params.max_rows_per_group) @@ -382,10 +388,19 @@ pub async fn write_fragments_internal( dataset: Option<&Dataset>, object_store: Arc, base_dir: &Path, - schema: Schema, + _schema: Schema, data: SendableRecordBatchStream, mut params: WriteParams, ) -> Result { + // Convert Arrow JSON columns to Lance JSON (JSONB) format + // + // FIXME: this is bad, really bad, we need to find a way to remove this. + let data = wrap_json_stream_for_writing(data); + + // Update the schema to match the converted data + let arrow_schema = data.schema(); + let converted_schema = Schema::try_from(arrow_schema.as_ref())?; + // Make sure the max rows per group is not larger than the max rows per file params.max_rows_per_group = std::cmp::min(params.max_rows_per_group, params.max_rows_per_file); @@ -393,7 +408,7 @@ pub async fn write_fragments_internal( match params.mode { WriteMode::Append | WriteMode::Create => { // Append mode, so we need to check compatibility - schema.check_compatible( + converted_schema.check_compatible( dataset.schema(), &SchemaCompareOptions { // We don't care if the user claims their data is nullable / non-nullable. We will @@ -407,7 +422,7 @@ pub async fn write_fragments_internal( )?; // Project from the dataset schema, because it has the correct field ids. let write_schema = dataset.schema().project_by_schema( - &schema, + &converted_schema, OnMissing::Error, OnTypeMismatch::Error, )?; @@ -428,13 +443,13 @@ pub async fn write_fragments_internal( .data_storage_format .lance_file_version()?, ); - (schema, data_storage_version) + (converted_schema, data_storage_version) } } } else { // Brand new dataset, use the schema from the data and the storage version // from the user or the default. - (schema, params.storage_version_or_default()) + (converted_schema, params.storage_version_or_default()) }; let data_schema = schema.project_by_schema( From 0351d80bebddca4610671d14b19f8001aa7c42cc Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 29 Aug 2025 03:58:30 +0800 Subject: [PATCH 07/11] Fix tests Signed-off-by: Xuanwo --- rust/lance-arrow/src/json.rs | 12 +++++++++++- rust/lance/src/dataset/utils.rs | 4 ++-- rust/lance/src/dataset/write.rs | 25 +++++++++++++++++++------ 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/rust/lance-arrow/src/json.rs b/rust/lance-arrow/src/json.rs index 2b5977f8cf8..1b1e4dead0e 100644 --- a/rust/lance-arrow/src/json.rs +++ b/rust/lance-arrow/src/json.rs @@ -341,7 +341,17 @@ fn get_json_path( pub fn arrow_json_to_lance_json(field: &ArrowField) -> ArrowField { if is_arrow_json_field(field) { // Convert Arrow JSON (Utf8/LargeUtf8) to Lance JSON (LargeBinary) - json_field(field.name(), field.is_nullable()) + // Preserve all metadata from the original field + let mut new_field = + ArrowField::new(field.name(), DataType::LargeBinary, field.is_nullable()); + + // Copy all metadata from the original field + let mut metadata = field.metadata().clone(); + // Add/override the extension metadata for Lance JSON + metadata.insert(ARROW_EXT_NAME_KEY.to_string(), JSON_EXT_NAME.to_string()); + + new_field = new_field.with_metadata(metadata); + new_field } else { field.clone() } diff --git a/rust/lance/src/dataset/utils.rs b/rust/lance/src/dataset/utils.rs index f9241e9295d..c528f6d8c2f 100644 --- a/rust/lance/src/dataset/utils.rs +++ b/rust/lance/src/dataset/utils.rs @@ -160,9 +160,9 @@ pub fn wrap_json_stream_for_writing( let mut new_fields = Vec::with_capacity(arrow_schema.fields().len()); for field in arrow_schema.fields() { if is_arrow_json_field(field) { - new_fields.push(arrow_json_to_lance_json(field)); + new_fields.push(Arc::new(arrow_json_to_lance_json(field))); } else { - new_fields.push(field.as_ref().clone()); + new_fields.push(Arc::clone(field)); } } let converted_schema = Arc::new(ArrowSchema::new_with_metadata( diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 0a9ecf45539..f09b2485629 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -41,6 +41,8 @@ use super::transaction::Transaction; use super::utils::wrap_json_stream_for_writing; use super::DATA_DIR; +use lance_arrow::json::is_arrow_json_field; + mod commit; pub mod delete; mod insert; @@ -388,18 +390,29 @@ pub async fn write_fragments_internal( dataset: Option<&Dataset>, object_store: Arc, base_dir: &Path, - _schema: Schema, + schema: Schema, data: SendableRecordBatchStream, mut params: WriteParams, ) -> Result { // Convert Arrow JSON columns to Lance JSON (JSONB) format // // FIXME: this is bad, really bad, we need to find a way to remove this. - let data = wrap_json_stream_for_writing(data); - - // Update the schema to match the converted data - let arrow_schema = data.schema(); - let converted_schema = Schema::try_from(arrow_schema.as_ref())?; + let needs_conversion = data + .schema() + .fields() + .iter() + .any(|f| is_arrow_json_field(f)); + + let (data, converted_schema) = if needs_conversion { + let data = wrap_json_stream_for_writing(data); + // Update the schema to match the converted data + let arrow_schema = data.schema(); + let converted_schema = Schema::try_from(arrow_schema.as_ref())?; + (data, converted_schema) + } else { + // No conversion needed, use original schema to preserve dictionary info + (data, schema) + }; // Make sure the max rows per group is not larger than the max rows per file params.max_rows_per_group = std::cmp::min(params.max_rows_per_group, params.max_rows_per_file); From 864efc2043191467b696b34b29d0de2d0c8ec539 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 29 Aug 2025 10:49:46 +0800 Subject: [PATCH 08/11] Fix tests Signed-off-by: Xuanwo --- rust/lance-datafusion/src/udf/json.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance-datafusion/src/udf/json.rs b/rust/lance-datafusion/src/udf/json.rs index 5257457fcc2..b782be4892a 100644 --- a/rust/lance-datafusion/src/udf/json.rs +++ b/rust/lance-datafusion/src/udf/json.rs @@ -868,7 +868,7 @@ mod tests { assert!(bool_array.value(0)); assert!(!bool_array.value(1)); assert!(bool_array.value(2)); - assert!(bool_array.is_null(3)); + assert!(!bool_array.value(3)); Ok(()) } From c3173e492b9fc650d9dd99b85f7d8223c25c7c32 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 29 Aug 2025 10:54:08 +0800 Subject: [PATCH 09/11] Fix json behavior in null Signed-off-by: Xuanwo --- rust/lance-datafusion/src/udf/json.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/rust/lance-datafusion/src/udf/json.rs b/rust/lance-datafusion/src/udf/json.rs index b782be4892a..c8c29c2f7cd 100644 --- a/rust/lance-datafusion/src/udf/json.rs +++ b/rust/lance-datafusion/src/udf/json.rs @@ -306,9 +306,7 @@ fn json_exists_impl(args: &[ArrayRef]) -> Result { for i in 0..jsonb_array.len() { if jsonb_array.is_null(i) { - // For null JSON values, return false instead of null - // This allows filters like "json_exists(data, '$.field') = false" to match null JSON - builder.append_value(false); + builder.append_null(); } else if let Some(path) = common::get_string_value_at(path_array, i) { let jsonb_bytes = jsonb_array.value(i); let exists = check_json_path_exists(jsonb_bytes, path)?; @@ -868,7 +866,7 @@ mod tests { assert!(bool_array.value(0)); assert!(!bool_array.value(1)); assert!(bool_array.value(2)); - assert!(!bool_array.value(3)); + assert!(bool_array.is_null(3)); Ok(()) } From c6b5fe873bc20a597146dd2ed89062b26d6528b4 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 29 Aug 2025 11:15:48 +0800 Subject: [PATCH 10/11] Fix datafusion version Signed-off-by: Xuanwo --- python/pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index 24a6ea2a00e..88b42376979 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -60,7 +60,8 @@ tests = [ # tensorflow is broken on 2.20.0 with macOS ARM. "tensorflow<=2.19.0", "tqdm", - "datafusion", + # Need to align with the datafusion version we use at lance rust + "datafusion==48.0.0,", ] dev = ["ruff==0.4.1", "pyright"] benchmarks = ["pytest-benchmark"] From c59315f12f690015ad7cc3142079d74dd1e4d573 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 29 Aug 2025 11:24:30 +0800 Subject: [PATCH 11/11] Fix typo Signed-off-by: Xuanwo --- python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index 88b42376979..a90aac3f408 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -61,7 +61,7 @@ tests = [ "tensorflow<=2.19.0", "tqdm", # Need to align with the datafusion version we use at lance rust - "datafusion==48.0.0,", + "datafusion==48.0.0", ] dev = ["ruff==0.4.1", "pyright"] benchmarks = ["pytest-benchmark"]