Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 60 additions & 7 deletions src/lfx/src/lfx/components/elastic/opensearch_multimodal.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,14 @@ def build_client(self) -> OpenSearch:
@check_cached_vector_store
def build_vector_store(self) -> OpenSearch:
# Return raw OpenSearch client as our "vector store."
self.log(self.ingest_data)
client = self.build_client()

# Check if we're in ingestion-only mode (no search query)
has_search_query = bool((self.search_query or "").strip())
if not has_search_query:
logger.debug("🔄 Ingestion-only mode activated: search operations will be skipped")
logger.debug("Starting ingestion mode...")

logger.warning(f"Embedding: {self.embedding}")
self._add_documents_to_vector_store(client=client)
return client
Expand All @@ -660,25 +666,41 @@ def _add_documents_to_vector_store(self, client: OpenSearch) -> None:
Args:
client: OpenSearch client for performing operations
"""
logger.debug("[INGESTION] _add_documents_to_vector_store called")
# Convert DataFrame to Data if needed using parent's method
self.ingest_data = self._prepare_ingest_data()

logger.debug(
f"[INGESTION] ingest_data type: "
f"{type(self.ingest_data)}, length: {len(self.ingest_data) if self.ingest_data else 0}"
)
logger.debug(
f"[INGESTION] ingest_data content: "
f"{self.ingest_data[:2] if self.ingest_data and len(self.ingest_data) > 0 else 'empty'}"
)

docs = self.ingest_data or []
if not docs:
self.log("No documents to ingest.")
logger.debug("✓ Ingestion complete: No documents provided")
return

if not self.embedding:
msg = "Embedding handle is required to embed documents."
raise ValueError(msg)

# Normalize embedding to list
# Normalize embedding to list first
embeddings_list = self.embedding if isinstance(self.embedding, list) else [self.embedding]

# Filter out None values (fail-safe mode) - do this BEFORE checking if empty
embeddings_list = [e for e in embeddings_list if e is not None]

# NOW check if we have any valid embeddings left after filtering
if not embeddings_list:
msg = "At least one embedding is required to embed documents."
raise ValueError(msg)
logger.warning("All embeddings returned None (fail-safe mode enabled). Skipping document ingestion.")
self.log("Embedding returned None (fail-safe mode enabled). Skipping document ingestion.")
return

logger.debug(f"[INGESTION] Valid embeddings after filtering: {len(embeddings_list)}")
self.log(f"Available embedding models: {len(embeddings_list)}")

# Select the embedding to use for ingestion
Expand Down Expand Up @@ -790,6 +812,7 @@ def _add_documents_to_vector_store(self, client: OpenSearch) -> None:

dynamic_field_name = get_embedding_field_name(embedding_model)

logger.info(f"✓ Selected embedding model for ingestion: '{embedding_model}'")
self.log(f"Using embedding model for ingestion: {embedding_model}")
self.log(f"Dynamic vector field: {dynamic_field_name}")

Expand All @@ -814,6 +837,7 @@ def _add_documents_to_vector_store(self, client: OpenSearch) -> None:
metadatas = []
# Process docs_metadata table input into a dict
additional_metadata = {}
logger.debug(f"[LF] Docs metadata {self.docs_metadata}")
if hasattr(self, "docs_metadata") and self.docs_metadata:
logger.info(f"[LF] Docs metadata {self.docs_metadata}")
if isinstance(self.docs_metadata[-1], Data):
Expand Down Expand Up @@ -956,6 +980,9 @@ def embed_chunk(chunk_text: str) -> list[float]:
)
self.log(metadatas)

logger.info(
f"✓ Ingestion complete: Successfully indexed {len(return_ids)} documents with model '{embedding_model}'"
)
self.log(f"Successfully indexed {len(return_ids)} documents with model {embedding_model}.")

# ---------- helpers for filters ----------
Expand Down Expand Up @@ -1172,6 +1199,11 @@ def search(self, query: str | None = None) -> list[dict[str, Any]]:
msg = "Embedding is required to run hybrid search (KNN + keyword)."
raise ValueError(msg)

# Check if embedding is None (fail-safe mode)
if self.embedding is None or (isinstance(self.embedding, list) and all(e is None for e in self.embedding)):
logger.error("Embedding returned None (fail-safe mode enabled). Cannot perform search.")
return []

# Build filter clauses first so we can use them in model detection
filter_clauses = self._coerce_filter_clauses(filter_obj)

Expand All @@ -1187,6 +1219,14 @@ def search(self, query: str | None = None) -> list[dict[str, Any]]:

# Normalize embedding to list
embeddings_list = self.embedding if isinstance(self.embedding, list) else [self.embedding]
# Filter out None values (fail-safe mode)
embeddings_list = [e for e in embeddings_list if e is not None]

if not embeddings_list:
logger.error(
"No valid embeddings available after filtering None values (fail-safe mode). Cannot perform search."
)
return []

# Create a comprehensive map of model names to embedding objects
# Check all possible identifiers (deployment, model, model_id, model_name)
Expand Down Expand Up @@ -1518,16 +1558,29 @@ def search_documents(self) -> list[Data]:
This is the main interface method that performs the multi-model search using the
configured search_query and returns results in Langflow's Data format.

Always builds the vector store (triggering ingestion if needed), then performs
search only if a query is provided.

Returns:
List of Data objects containing search results with text and metadata

Raises:
Exception: If search operation fails
"""
try:
raw = self.search(self.search_query or "")
# Always build/cache the vector store to ensure ingestion happens
if self._cached_vector_store is None:
self.build_vector_store()

# Only perform search if query is provided
search_query = (self.search_query or "").strip()
if not search_query:
self.log("No search query provided - ingestion completed, returning empty results")
return []

# Perform search with the provided query
raw = self.search(search_query)
return [Data(text=hit["page_content"], **hit["metadata"]) for hit in raw]
self.log(self.ingest_data)
except Exception as e:
self.log(f"search_documents error: {e}")
raise
Expand Down
Loading
Loading