Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/lfx/src/lfx/_assets/component_index.json

Large diffs are not rendered by default.

65 changes: 58 additions & 7 deletions src/lfx/src/lfx/components/elastic/opensearch_multimodal.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,14 @@
@check_cached_vector_store
def build_vector_store(self) -> OpenSearch:
# Return raw OpenSearch client as our "vector store."
self.log(self.ingest_data)
client = self.build_client()

# Check if we're in ingestion-only mode (no search query)
has_search_query = bool((self.search_query or "").strip())
if not has_search_query:
logger.debug("🔄 Ingestion-only mode activated: search operations will be skipped")
logger.debug("Starting ingestion mode...")

logger.warning(f"Embedding: {self.embedding}")
self._add_documents_to_vector_store(client=client)
return client
Expand All @@ -660,25 +666,39 @@
Args:
client: OpenSearch client for performing operations
"""
logger.debug("[INGESTION] _add_documents_to_vector_store called")
# Convert DataFrame to Data if needed using parent's method
self.ingest_data = self._prepare_ingest_data()

logger.debug(
f"[INGESTION] ingest_data type: {type(self.ingest_data)}, length: {len(self.ingest_data) if self.ingest_data else 0}"

Check failure on line 674 in src/lfx/src/lfx/components/elastic/opensearch_multimodal.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (E501)

src/lfx/src/lfx/components/elastic/opensearch_multimodal.py:674:121: E501 Line too long (129 > 120)
)
logger.debug(
f"[INGESTION] ingest_data content: {self.ingest_data[:2] if self.ingest_data and len(self.ingest_data) > 0 else 'empty'}"

Check failure on line 677 in src/lfx/src/lfx/components/elastic/opensearch_multimodal.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (E501)

src/lfx/src/lfx/components/elastic/opensearch_multimodal.py:677:121: E501 Line too long (133 > 120)
)
Comment on lines 673 to 680
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix line length violations to pass linting.

Lines 674 and 677 exceed the 120-character limit, causing the Ruff style check to fail.

Apply this diff to fix the line length:

-        logger.debug(
-            f"[INGESTION] ingest_data type: {type(self.ingest_data)}, length: {len(self.ingest_data) if self.ingest_data else 0}"
-        )
-        logger.debug(
-            f"[INGESTION] ingest_data content: {self.ingest_data[:2] if self.ingest_data and len(self.ingest_data) > 0 else 'empty'}"
-        )
+        ingest_len = len(self.ingest_data) if self.ingest_data else 0
+        logger.debug(f"[INGESTION] ingest_data type: {type(self.ingest_data)}, length: {ingest_len}")
+        ingest_preview = self.ingest_data[:2] if self.ingest_data and len(self.ingest_data) > 0 else "empty"
+        logger.debug(f"[INGESTION] ingest_data content: {ingest_preview}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
logger.debug(
f"[INGESTION] ingest_data type: {type(self.ingest_data)}, length: {len(self.ingest_data) if self.ingest_data else 0}"
)
logger.debug(
f"[INGESTION] ingest_data content: {self.ingest_data[:2] if self.ingest_data and len(self.ingest_data) > 0 else 'empty'}"
)
ingest_len = len(self.ingest_data) if self.ingest_data else 0
logger.debug(f"[INGESTION] ingest_data type: {type(self.ingest_data)}, length: {ingest_len}")
ingest_preview = self.ingest_data[:2] if self.ingest_data and len(self.ingest_data) > 0 else "empty"
logger.debug(f"[INGESTION] ingest_data content: {ingest_preview}")
🧰 Tools
🪛 GitHub Actions: Ruff Style Check

[error] 674-674: Ruff: E501 Line too long (129 > 120).

🪛 GitHub Check: Ruff Style Check (3.13)

[failure] 677-677: Ruff (E501)
src/lfx/src/lfx/components/elastic/opensearch_multimodal.py:677:121: E501 Line too long (133 > 120)


[failure] 674-674: Ruff (E501)
src/lfx/src/lfx/components/elastic/opensearch_multimodal.py:674:121: E501 Line too long (129 > 120)

🤖 Prompt for AI Agents
In src/lfx/src/lfx/components/elastic/opensearch_multimodal.py around lines 673
to 678, the two logger.debug lines exceed the 120-character limit; split or
reformat their f-strings so each line stays under 120 chars (for example, assign
the conditional parts to short local variables or break the f-string into
multiple concatenated parts) and keep the original log content and semantics
intact.


docs = self.ingest_data or []
if not docs:
self.log("No documents to ingest.")
logger.debug("✓ Ingestion complete: No documents provided")
return

if not self.embedding:
msg = "Embedding handle is required to embed documents."
raise ValueError(msg)

# Normalize embedding to list
# Normalize embedding to list first
embeddings_list = self.embedding if isinstance(self.embedding, list) else [self.embedding]

# Filter out None values (fail-safe mode) - do this BEFORE checking if empty
embeddings_list = [e for e in embeddings_list if e is not None]

# NOW check if we have any valid embeddings left after filtering
if not embeddings_list:
msg = "At least one embedding is required to embed documents."
raise ValueError(msg)
logger.warning("All embeddings returned None (fail-safe mode enabled). Skipping document ingestion.")
self.log("Embedding returned None (fail-safe mode enabled). Skipping document ingestion.")
return

logger.debug(f"[INGESTION] Valid embeddings after filtering: {len(embeddings_list)}")
self.log(f"Available embedding models: {len(embeddings_list)}")

# Select the embedding to use for ingestion
Expand Down Expand Up @@ -790,6 +810,7 @@

dynamic_field_name = get_embedding_field_name(embedding_model)

logger.info(f"✓ Selected embedding model for ingestion: '{embedding_model}'")
self.log(f"Using embedding model for ingestion: {embedding_model}")
self.log(f"Dynamic vector field: {dynamic_field_name}")

Expand All @@ -814,6 +835,7 @@
metadatas = []
# Process docs_metadata table input into a dict
additional_metadata = {}
logger.debug(f"[LF] Docs metadata {self.docs_metadata}")
if hasattr(self, "docs_metadata") and self.docs_metadata:
logger.info(f"[LF] Docs metadata {self.docs_metadata}")
if isinstance(self.docs_metadata[-1], Data):
Expand Down Expand Up @@ -956,6 +978,9 @@
)
self.log(metadatas)

logger.info(
f"✓ Ingestion complete: Successfully indexed {len(return_ids)} documents with model '{embedding_model}'"
)
self.log(f"Successfully indexed {len(return_ids)} documents with model {embedding_model}.")

# ---------- helpers for filters ----------
Expand Down Expand Up @@ -1172,6 +1197,11 @@
msg = "Embedding is required to run hybrid search (KNN + keyword)."
raise ValueError(msg)

# Check if embedding is None (fail-safe mode)
if self.embedding is None or (isinstance(self.embedding, list) and all(e is None for e in self.embedding)):
logger.error("Embedding returned None (fail-safe mode enabled). Cannot perform search.")
return []

# Build filter clauses first so we can use them in model detection
filter_clauses = self._coerce_filter_clauses(filter_obj)

Expand All @@ -1187,6 +1217,14 @@

# Normalize embedding to list
embeddings_list = self.embedding if isinstance(self.embedding, list) else [self.embedding]
# Filter out None values (fail-safe mode)
embeddings_list = [e for e in embeddings_list if e is not None]

if not embeddings_list:
logger.error(
"No valid embeddings available after filtering None values (fail-safe mode). Cannot perform search."
)
return []

# Create a comprehensive map of model names to embedding objects
# Check all possible identifiers (deployment, model, model_id, model_name)
Expand Down Expand Up @@ -1518,16 +1556,29 @@
This is the main interface method that performs the multi-model search using the
configured search_query and returns results in Langflow's Data format.
Always builds the vector store (triggering ingestion if needed), then performs
search only if a query is provided.
Returns:
List of Data objects containing search results with text and metadata
Raises:
Exception: If search operation fails
"""
try:
raw = self.search(self.search_query or "")
# Always build/cache the vector store to ensure ingestion happens
if self._cached_vector_store is None:
self.build_vector_store()

# Only perform search if query is provided
search_query = (self.search_query or "").strip()
if not search_query:
self.log("No search query provided - ingestion completed, returning empty results")
return []

# Perform search with the provided query
raw = self.search(search_query)
return [Data(text=hit["page_content"], **hit["metadata"]) for hit in raw]
self.log(self.ingest_data)
except Exception as e:
self.log(f"search_documents error: {e}")
raise
Expand Down
Loading
Loading