Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[autofix.ci] apply automated fixes
  • Loading branch information
autofix-ci[bot] authored Sep 3, 2025
commit 4c2d598937d94420bedaed4ab37f61e4f74e0fb8
109 changes: 25 additions & 84 deletions src/backend/base/langflow/components/knowledge_bases/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,9 @@ class NewKnowledgeBaseInput:
display_name="Choose Embedding",
info="Select the embedding model to use for this knowledge base.",
required=True,
options=OPENAI_EMBEDDING_MODEL_NAMES
+ HUGGINGFACE_MODEL_NAMES
+ COHERE_MODEL_NAMES,
options_metadata=[
{"icon": "OpenAI"}
for _ in OPENAI_EMBEDDING_MODEL_NAMES
]
+ [
{"icon": "HuggingFace"}
for _ in HUGGINGFACE_MODEL_NAMES
]
options=OPENAI_EMBEDDING_MODEL_NAMES + HUGGINGFACE_MODEL_NAMES + COHERE_MODEL_NAMES,
options_metadata=[{"icon": "OpenAI"} for _ in OPENAI_EMBEDDING_MODEL_NAMES]
+ [{"icon": "HuggingFace"} for _ in HUGGINGFACE_MODEL_NAMES]
+ [{"icon": "Cohere"} for _ in COHERE_MODEL_NAMES],
),
"03_api_key": SecretStrInput(
Expand Down Expand Up @@ -208,9 +200,7 @@ class NewKnowledgeBaseInput:
]

# ------ Outputs -------------------------------------------------------
outputs = [
Output(display_name="Results", name="dataframe_output", method="build_kb_info")
]
outputs = [Output(display_name="Results", name="dataframe_output", method="build_kb_info")]

# ------ Internal helpers ---------------------------------------------
def _get_kb_root(self) -> Path:
Expand Down Expand Up @@ -301,9 +291,7 @@ def _build_embedding_metadata(self, embedding_model, api_key) -> dict[str, Any]:
if api_key_to_save:
settings_service = get_settings_service()
try:
encrypted_api_key = encrypt_api_key(
api_key_to_save, settings_service=settings_service
)
encrypted_api_key = encrypt_api_key(api_key_to_save, settings_service=settings_service)
except (TypeError, ValueError) as e:
self.log(f"Could not encrypt API key: {e}")
logger.error(f"Could not encrypt API key: {e}")
Expand All @@ -317,9 +305,7 @@ def _build_embedding_metadata(self, embedding_model, api_key) -> dict[str, Any]:
"created_at": datetime.now(timezone.utc).isoformat(),
}

def _save_embedding_metadata(
self, kb_path: Path, embedding_model: str, api_key: str
) -> None:
def _save_embedding_metadata(self, kb_path: Path, embedding_model: str, api_key: str) -> None:
"""Save embedding model metadata."""
embedding_metadata = self._build_embedding_metadata(embedding_model, api_key)
metadata_path = kb_path / "embedding_metadata.json"
Expand All @@ -344,9 +330,7 @@ def _save_kb_files(
except (OSError, TypeError, ValueError) as e:
self.log(f"Error saving KB files: {e}")

def _build_column_metadata(
self, config_list: list[dict[str, Any]], df_source: pd.DataFrame
) -> dict[str, Any]:
def _build_column_metadata(self, config_list: list[dict[str, Any]], df_source: pd.DataFrame) -> dict[str, Any]:
"""Build detailed column metadata."""
metadata: dict[str, Any] = {
"total_columns": len(df_source.columns),
Expand All @@ -358,12 +342,8 @@ def _build_column_metadata(

for config in config_list:
col_name = config.get("column_name")
vectorize = (
config.get("vectorize") == "True" or config.get("vectorize") is True
)
identifier = (
config.get("identifier") == "True" or config.get("identifier") is True
)
vectorize = config.get("vectorize") == "True" or config.get("vectorize") is True
identifier = config.get("identifier") == "True" or config.get("identifier") is True

# Add to columns list
metadata["columns"].append(
Expand Down Expand Up @@ -402,9 +382,7 @@ async def _create_vector_store(
embedding_function = self._build_embeddings(embedding_model, api_key)

# Convert DataFrame to Data objects (following Local DB pattern)
data_objects = await self._convert_df_to_data_objects(
df_source, config_list
)
data_objects = await self._convert_df_to_data_objects(df_source, config_list)

# Create vector store
chroma = Chroma(
Expand All @@ -422,9 +400,7 @@ async def _create_vector_store(
# Add documents to vector store
if documents:
chroma.add_documents(documents)
self.log(
f"Added {len(documents)} documents to vector store '{self.knowledge_base}'"
)
self.log(f"Added {len(documents)} documents to vector store '{self.knowledge_base}'")

except (OSError, ValueError, RuntimeError) as e:
self.log(f"Error creating vector store: {e}")
Expand All @@ -448,24 +424,16 @@ async def _convert_df_to_data_objects(
all_docs = chroma.get()

# Extract all _id values from metadata
id_list = [
metadata.get("_id")
for metadata in all_docs["metadatas"]
if metadata.get("_id")
]
id_list = [metadata.get("_id") for metadata in all_docs["metadatas"] if metadata.get("_id")]

# Get column roles
content_cols = []
identifier_cols = []

for config in config_list:
col_name = config.get("column_name")
vectorize = (
config.get("vectorize") == "True" or config.get("vectorize") is True
)
identifier = (
config.get("identifier") == "True" or config.get("identifier") is True
)
vectorize = config.get("vectorize") == "True" or config.get("vectorize") is True
identifier = config.get("identifier") == "True" or config.get("identifier") is True

if vectorize:
content_cols.append(col_name)
Expand All @@ -475,11 +443,7 @@ async def _convert_df_to_data_objects(
# Convert each row to a Data object
for _, row in df_source.iterrows():
# Build content text from identifier columns using list comprehension
identifier_parts = [
str(row[col])
for col in content_cols
if col in row and pd.notna(row[col])
]
identifier_parts = [str(row[col]) for col in content_cols if col in row and pd.notna(row[col])]

# Join all parts into a single string
page_content = " ".join(identifier_parts)
Expand All @@ -491,11 +455,7 @@ async def _convert_df_to_data_objects(

# Add identifier columns if they exist
if identifier_cols:
identifier_parts = [
str(row[col])
for col in identifier_cols
if col in row and pd.notna(row[col])
]
identifier_parts = [str(row[col]) for col in identifier_cols if col in row and pd.notna(row[col])]
page_content = " ".join(identifier_parts)

# Add metadata columns as simple key-value pairs
Expand All @@ -520,9 +480,7 @@ async def _convert_df_to_data_objects(

return data_objects

def is_valid_collection_name(
self, name, min_length: int = 3, max_length: int = 63
) -> bool:
def is_valid_collection_name(self, name, min_length: int = 3, max_length: int = 63) -> bool:
"""Validates collection name against conditions 1-3.

1. Contains 3-63 characters
Expand Down Expand Up @@ -578,9 +536,7 @@ async def _kb_path(self) -> Path | None:
async def build_kb_info(self) -> Data:
"""Main ingestion routine → returns a dict with KB metadata."""
try:
input_value = (
self.input_df[0] if isinstance(self.input_df, list) else self.input_df
)
input_value = self.input_df[0] if isinstance(self.input_df, list) else self.input_df
df_source: DataFrame = convert_to_dataframe(input_value)

# Validate column configuration (using Structured Output patterns)
Expand All @@ -602,9 +558,7 @@ async def build_kb_info(self) -> Data:
try:
api_key = decrypt_api_key(metadata["api_key"], settings_service)
except (InvalidToken, TypeError, ValueError) as e:
logger.error(
f"Could not decrypt API key. Please provide it manually. Error: {e}"
)
logger.error(f"Could not decrypt API key. Please provide it manually. Error: {e}")

# Check if a custom API key was provided, update metadata if so
if self.api_key:
Expand All @@ -616,9 +570,7 @@ async def build_kb_info(self) -> Data:
)

# Create vector store following Local DB component pattern
await self._create_vector_store(
df_source, config_list, embedding_model=embedding_model, api_key=api_key
)
await self._create_vector_store(df_source, config_list, embedding_model=embedding_model, api_key=api_key)

# Save KB files (using File Component storage patterns)
self._save_kb_files(kb_path, config_list)
Expand All @@ -635,9 +587,7 @@ async def build_kb_info(self) -> Data:
}

# Set status message
self.status = (
f"✅ KB **{self.knowledge_base}** saved · {len(df_source)} chunks."
)
self.status = f"✅ KB **{self.knowledge_base}** saved · {len(df_source)} chunks."

return Data(data=meta)

Expand Down Expand Up @@ -685,9 +635,7 @@ async def update_build_config(
if isinstance(field_value, dict) and "01_new_kb_name" in field_value:
# Validate the knowledge base name - Make sure it follows these rules:
if not self.is_valid_collection_name(field_value["01_new_kb_name"]):
msg = (
f"Invalid knowledge base name: {field_value['01_new_kb_name']}"
)
msg = f"Invalid knowledge base name: {field_value['01_new_kb_name']}"
raise ValueError(msg)

api_key = field_value.get("03_api_key", None)
Expand All @@ -701,9 +649,7 @@ async def update_build_config(
raise ValueError(msg)

# We need to test the API Key one time against the embedding model
embed_model = self._build_embeddings(
embedding_model=field_value["02_embedding_model"], api_key=api_key
)
embed_model = self._build_embeddings(embedding_model=field_value["02_embedding_model"], api_key=api_key)

# Try to generate a dummy embedding to validate the API key without blocking the event loop
try:
Expand All @@ -719,9 +665,7 @@ async def update_build_config(
raise ValueError(msg) from e

# Create the new knowledge base directory
kb_path = (
KNOWLEDGE_BASES_ROOT_PATH / kb_user / field_value["01_new_kb_name"]
)
kb_path = KNOWLEDGE_BASES_ROOT_PATH / kb_user / field_value["01_new_kb_name"]
kb_path.mkdir(parents=True, exist_ok=True)

# Save the embedding metadata
Expand All @@ -739,10 +683,7 @@ async def update_build_config(
)

# If the selected knowledge base is not available, reset it
if (
build_config["knowledge_base"]["value"]
not in build_config["knowledge_base"]["options"]
):
if build_config["knowledge_base"]["value"] not in build_config["knowledge_base"]["options"]:
build_config["knowledge_base"]["value"] = None

return build_config
Loading