Skip to content

Commit 716d908

Browse files
Empreiteiroautofix-ci[bot]erichare
authored
feat: Enhance Data Operations Component with JSON features (#9688)
* feat: Enhance Data Operations Component with JSON features This PR enhances the DataOperationsComponent by introducing two new powerful operations: JSON Path: Allows extracting values from nested JSON structures using path expressions, enabling more flexible data selection. JSON Query: Integrates jq to support advanced JSON queries, including filters, projections, and transformations. Additional improvements: Input handling and validations for both new operations. Enhanced error messages and logs for better debugging. Extended UI field visibility logic in update_build_config to support dynamic path selection from JSON. These updates significantly expand the component's capabilities for parsing, transforming, and querying structured JSON data. Summary by CodeRabbit New Features Added Mapped JSON and jQuery actions to map and query JSON, including JSON path extraction and jq-based queries. Automatically repairs and parses malformed JSON before processing. New inputs for JSON operations: multiline JSON preview, dynamic key selector, and query field. Documentation Updated in-app help texts to clarify top-level key constraints and provide guidance for JSON mapping and querying. * [autofix.ci] apply automated fixes * feat: Refactor data_operations.py for clarity and updates * [autofix.ci] apply automated fixes * Update data_operations.py * [autofix.ci] apply automated fixes * Fix ruff check --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Eric Hare <[email protected]>
1 parent 0c54a11 commit 716d908

File tree

1 file changed

+160
-74
lines changed

1 file changed

+160
-74
lines changed

src/lfx/src/lfx/components/processing/data_operations.py

Lines changed: 160 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
import ast
2+
import json
23
from typing import TYPE_CHECKING, Any
34

5+
import jq
6+
from json_repair import repair_json
7+
48
from lfx.custom import Component
59
from lfx.inputs import DictInput, DropdownInput, MessageTextInput, SortableListInput
6-
from lfx.io import DataInput, Output
10+
from lfx.io import DataInput, MultilineInput, Output
711
from lfx.log.logger import logger
812
from lfx.schema import Data
913
from lfx.schema.dotdict import dotdict
@@ -20,6 +24,8 @@
2024
"Append or Update": {"is_list": False, "log_msg": "setting Append or Update fields"},
2125
"Remove Keys": {"is_list": False, "log_msg": "setting remove keys fields"},
2226
"Rename Keys": {"is_list": False, "log_msg": "setting rename keys fields"},
27+
"Path Selection": {"is_list": False, "log_msg": "setting mapped key extractor fields"},
28+
"JQ Expression": {"is_list": False, "log_msg": "setting parse json fields"},
2329
}
2430
OPERATORS = {
2531
"equals": lambda a, b: str(a) == str(b),
@@ -33,7 +39,6 @@
3339
class DataOperationsComponent(Component):
3440
display_name = "Data Operations"
3541
description = "Perform various operations on a Data object."
36-
documentation: str = "https://docs.langflow.org/components-processing#data-operations"
3742
icon = "file-json"
3843
name = "DataOperations"
3944
default_keys = ["operations", "data"]
@@ -59,6 +64,9 @@ class DataOperationsComponent(Component):
5964
"data filtering",
6065
"data selection",
6166
"data combination",
67+
"Parse JSON",
68+
"JSON Query",
69+
"JQ Query",
6270
],
6371
}
6472
actions_data = {
@@ -69,8 +77,47 @@ class DataOperationsComponent(Component):
6977
"Append or Update": ["append_update_data", "operations"],
7078
"Remove Keys": ["remove_keys_input", "operations"],
7179
"Rename Keys": ["rename_keys_input", "operations"],
80+
"Path Selection": ["mapped_json_display", "selected_key", "operations"],
81+
"JQ Expression": ["query", "operations"],
7282
}
7383

84+
@staticmethod
85+
def extract_all_paths(obj, path=""):
86+
paths = []
87+
if isinstance(obj, dict):
88+
for k, v in obj.items():
89+
new_path = f"{path}.{k}" if path else f".{k}"
90+
paths.append(new_path)
91+
paths.extend(DataOperationsComponent.extract_all_paths(v, new_path))
92+
elif isinstance(obj, list) and obj:
93+
new_path = f"{path}[0]"
94+
paths.append(new_path)
95+
paths.extend(DataOperationsComponent.extract_all_paths(obj[0], new_path))
96+
return paths
97+
98+
@staticmethod
99+
def remove_keys_recursive(obj, keys_to_remove):
100+
if isinstance(obj, dict):
101+
return {
102+
k: DataOperationsComponent.remove_keys_recursive(v, keys_to_remove)
103+
for k, v in obj.items()
104+
if k not in keys_to_remove
105+
}
106+
if isinstance(obj, list):
107+
return [DataOperationsComponent.remove_keys_recursive(item, keys_to_remove) for item in obj]
108+
return obj
109+
110+
@staticmethod
111+
def rename_keys_recursive(obj, rename_map):
112+
if isinstance(obj, dict):
113+
return {
114+
rename_map.get(k, k): DataOperationsComponent.rename_keys_recursive(v, rename_map)
115+
for k, v in obj.items()
116+
}
117+
if isinstance(obj, list):
118+
return [DataOperationsComponent.rename_keys_recursive(item, rename_map) for item in obj]
119+
return obj
120+
74121
inputs = [
75122
DataInput(name="data", display_name="Data", info="Data object to filter.", required=True, is_list=True),
76123
SortableListInput(
@@ -86,6 +133,8 @@ class DataOperationsComponent(Component):
86133
{"name": "Append or Update", "icon": "circle-plus"},
87134
{"name": "Remove Keys", "icon": "eraser"},
88135
{"name": "Rename Keys", "icon": "pencil-line"},
136+
{"name": "Path Selection", "icon": "mouse-pointer"},
137+
{"name": "JQ Expression", "icon": "terminal"},
89138
],
90139
real_time_refresh=True,
91140
limit=1,
@@ -94,15 +143,18 @@ class DataOperationsComponent(Component):
94143
MessageTextInput(
95144
name="select_keys_input",
96145
display_name="Select Keys",
97-
info="List of keys to select from the data.",
146+
info="List of keys to select from the data. Only top-level keys can be selected.",
98147
show=False,
99148
is_list=True,
100149
),
101150
# filter values inputs
102151
MessageTextInput(
103152
name="filter_key",
104153
display_name="Filter Key",
105-
info="Key to filter by.",
154+
info=(
155+
"Name of the key containing the list to filter. "
156+
"It must be a top-level key in the JSON and its value must be a list."
157+
),
106158
is_list=True,
107159
show=False,
108160
),
@@ -126,7 +178,7 @@ class DataOperationsComponent(Component):
126178
DictInput(
127179
name="append_update_data",
128180
display_name="Append or Update",
129-
info="Data to Append or Updatethe existing data with.",
181+
info="Data to append or update the existing data with. Only top-level keys are checked.",
130182
show=False,
131183
value={"key": "value"},
132184
is_list=True,
@@ -148,6 +200,26 @@ class DataOperationsComponent(Component):
148200
is_list=True,
149201
value={"old_key": "new_key"},
150202
),
203+
MultilineInput(
204+
name="mapped_json_display",
205+
display_name="JSON to Map",
206+
info="Paste or preview your JSON here to explore its structure and select a path for extraction.",
207+
required=False,
208+
refresh_button=True,
209+
real_time_refresh=True,
210+
placeholder="Add a JSON example.",
211+
show=False,
212+
),
213+
DropdownInput(
214+
name="selected_key", display_name="Select Path", options=[], required=False, dynamic=True, show=False
215+
),
216+
MessageTextInput(
217+
name="query",
218+
display_name="JQ Expression",
219+
info="JSON Query to filter the data. Used by Parse JSON operation.",
220+
placeholder="e.g., .properties.id",
221+
show=False,
222+
),
151223
]
152224
outputs = [
153225
Output(display_name="Data", name="data_output", method="as_data"),
@@ -156,10 +228,39 @@ class DataOperationsComponent(Component):
156228
# Helper methods for data operations
157229
def get_data_dict(self) -> dict:
158230
"""Extract data dictionary from Data object."""
159-
# TODO: rasie error if it s list of data objects
160231
data = self.data[0] if isinstance(self.data, list) and len(self.data) == 1 else self.data
161232
return data.model_dump()
162233

234+
def json_query(self) -> Data:
235+
import json
236+
237+
import jq
238+
239+
if not self.query or not self.query.strip():
240+
msg = "JSON Query is required and cannot be blank."
241+
raise ValueError(msg)
242+
raw_data = self.get_data_dict()
243+
try:
244+
input_str = json.dumps(raw_data)
245+
repaired = repair_json(input_str)
246+
data_json = json.loads(repaired)
247+
jq_input = data_json["data"] if isinstance(data_json, dict) and "data" in data_json else data_json
248+
results = jq.compile(self.query).input(jq_input).all()
249+
if not results:
250+
msg = "No result from JSON query."
251+
raise ValueError(msg)
252+
result = results[0] if len(results) == 1 else results
253+
if result is None or result == "None":
254+
msg = "JSON query returned null/None. Check if the path exists in your data."
255+
raise ValueError(msg)
256+
if isinstance(result, dict):
257+
return Data(data=result)
258+
return Data(data={"result": result})
259+
except (ValueError, TypeError, KeyError, json.JSONDecodeError) as e:
260+
logger.error(f"JSON Query failed: {e}")
261+
msg = f"JSON Query error: {e}"
262+
raise ValueError(msg) from e
263+
163264
def get_normalized_data(self) -> dict:
164265
"""Get normalized data dictionary, handling the 'data' key if present."""
165266
data_dict = self.get_data_dict()
@@ -204,34 +305,22 @@ def select_keys(self, *, evaluate: bool | None = None) -> Data:
204305
return Data(data=filtered)
205306

206307
def remove_keys(self) -> Data:
207-
"""Remove specified keys from the data dictionary."""
308+
"""Remove specified keys from the data dictionary, recursively."""
208309
self.validate_single_data("Remove Keys")
209310
data_dict = self.get_normalized_data()
210311
remove_keys_input: list[str] = self.remove_keys_input
211312

212-
for key in remove_keys_input:
213-
if key in data_dict:
214-
data_dict.pop(key)
215-
else:
216-
logger.warning(f"Key '{key}' not found in data. Skipping removal.")
217-
218-
return Data(**data_dict)
313+
filtered = DataOperationsComponent.remove_keys_recursive(data_dict, set(remove_keys_input))
314+
return Data(data=filtered)
219315

220316
def rename_keys(self) -> Data:
221-
"""Rename keys in the data dictionary."""
317+
"""Rename keys in the data dictionary, recursively."""
222318
self.validate_single_data("Rename Keys")
223319
data_dict = self.get_normalized_data()
224320
rename_keys_input: dict[str, str] = self.rename_keys_input
225321

226-
for old_key, new_key in rename_keys_input.items():
227-
if old_key in data_dict:
228-
data_dict[new_key] = data_dict[old_key]
229-
data_dict.pop(old_key)
230-
else:
231-
msg = f"Key '{old_key}' not found in data. Skipping rename."
232-
raise ValueError(msg)
233-
234-
return Data(**data_dict)
322+
renamed = DataOperationsComponent.rename_keys_recursive(data_dict, rename_keys_input)
323+
return Data(data=renamed)
235324

236325
def recursive_eval(self, data: Any) -> Any:
237326
"""Recursively evaluate string values in a dictionary or list.
@@ -299,13 +388,6 @@ def combine_data(self, *, evaluate: bool | None = None) -> Data:
299388

300389
return Data(**combined_data)
301390

302-
def compare_values(self, item_value: Any, filter_value: str, operator: str) -> bool:
303-
"""Compare values based on the specified operator."""
304-
comparison_func = OPERATORS.get(operator)
305-
if comparison_func:
306-
return comparison_func(item_value, filter_value)
307-
return False
308-
309391
def filter_data(self, input_data: list[dict[str, Any]], filter_key: str, filter_value: str, operator: str) -> list:
310392
"""Filter list data based on key, value, and operator."""
311393
# Validate inputs
@@ -328,6 +410,12 @@ def filter_data(self, input_data: list[dict[str, Any]], filter_key: str, filter_
328410

329411
return filtered_data
330412

413+
def compare_values(self, item_value: Any, filter_value: str, operator: str) -> bool:
414+
comparison_func = OPERATORS.get(operator)
415+
if comparison_func:
416+
return comparison_func(item_value, filter_value)
417+
return False
418+
331419
def multi_filter_data(self) -> Data:
332420
"""Apply multiple filters to the data."""
333421
self.validate_single_data("Filter Values")
@@ -366,57 +454,55 @@ def append_update(self) -> Data:
366454

367455
# Configuration and execution methods
368456
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None) -> dotdict:
369-
"""Update build configuration based on selected action."""
370-
if field_name != "operations":
371-
return build_config
372-
373-
build_config["operations"]["value"] = field_value
374-
selected_actions = [action["name"] for action in field_value]
375-
376-
# Handle single action case
377-
if len(selected_actions) == 1 and selected_actions[0] in ACTION_CONFIG:
378-
action = selected_actions[0]
379-
config = ACTION_CONFIG[action]
380-
381-
build_config["data"]["is_list"] = config["is_list"]
382-
logger.info(config["log_msg"])
383-
384-
return set_current_fields(
385-
build_config=build_config,
386-
action_fields=self.actions_data,
387-
selected_action=action,
388-
default_fields=self.default_keys,
389-
func=set_field_display,
390-
)
391-
392-
# Handle no operations case
393-
if not selected_actions:
394-
logger.info("setting default fields")
395-
return set_current_fields(
396-
build_config=build_config,
397-
action_fields=self.actions_data,
398-
selected_action=None,
399-
default_fields=self.default_keys,
400-
func=set_field_display,
401-
)
457+
if field_name == "operations":
458+
build_config["operations"]["value"] = field_value
459+
selected_actions = [action["name"] for action in field_value]
460+
if len(selected_actions) == 1 and selected_actions[0] in ACTION_CONFIG:
461+
action = selected_actions[0]
462+
config = ACTION_CONFIG[action]
463+
build_config["data"]["is_list"] = config["is_list"]
464+
logger.info(config["log_msg"])
465+
return set_current_fields(
466+
build_config, self.actions_data, action, ["operations", "data"], set_field_display
467+
)
468+
469+
if field_name == "mapped_json_display":
470+
try:
471+
parsed_json = json.loads(field_value)
472+
keys = DataOperationsComponent.extract_all_paths(parsed_json)
473+
build_config["selected_key"]["options"] = keys
474+
build_config["selected_key"]["show"] = True
475+
except (json.JSONDecodeError, TypeError, ValueError) as e:
476+
logger.error(f"Error parsing mapped JSON: {e}")
477+
build_config["selected_key"]["show"] = False
402478

403479
return build_config
404480

481+
def json_path(self) -> Data:
482+
try:
483+
if not self.data or not self.selected_key:
484+
msg = "Missing input data or selected key."
485+
raise ValueError(msg)
486+
input_payload = self.data[0].data if isinstance(self.data, list) else self.data.data
487+
compiled = jq.compile(self.selected_key)
488+
result = compiled.input(input_payload).first()
489+
if isinstance(result, dict):
490+
return Data(data=result)
491+
return Data(data={"result": result})
492+
except (ValueError, TypeError, KeyError) as e:
493+
self.status = f"Error: {e!s}"
494+
self.log(self.status)
495+
return Data(data={"error": str(e)})
496+
405497
def as_data(self) -> Data:
406-
"""Execute the selected action on the data."""
407498
if not hasattr(self, "operations") or not self.operations:
408499
return Data(data={})
409500

410501
selected_actions = [action["name"] for action in self.operations]
411502
logger.info(f"selected_actions: {selected_actions}")
412-
413-
# Only handle single action case for now
414503
if len(selected_actions) != 1:
415504
return Data(data={})
416505

417-
action = selected_actions[0]
418-
419-
# Explicitly type the action_map
420506
action_map: dict[str, Callable[[], Data]] = {
421507
"Select Keys": self.select_keys,
422508
"Literal Eval": self.evaluate_data,
@@ -425,14 +511,14 @@ def as_data(self) -> Data:
425511
"Append or Update": self.append_update,
426512
"Remove Keys": self.remove_keys,
427513
"Rename Keys": self.rename_keys,
514+
"Path Selection": self.json_path,
515+
"JQ Expression": self.json_query,
428516
}
429-
430-
handler: Callable[[], Data] | None = action_map.get(action)
517+
handler: Callable[[], Data] | None = action_map.get(selected_actions[0])
431518
if handler:
432519
try:
433520
return handler()
434521
except Exception as e:
435-
logger.error(f"Error executing {action}: {e!s}")
522+
logger.error(f"Error executing {selected_actions[0]}: {e!s}")
436523
raise
437-
438524
return Data(data={})

0 commit comments

Comments
 (0)