Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
2a0dabd
fix: Knowledge base component refactor
erichare Aug 26, 2025
78cd280
[autofix.ci] apply automated fixes
autofix-ci[bot] Aug 26, 2025
84e6c05
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Aug 26, 2025
b7d033b
Merge branch 'main' into fix-kb-adjustments
erichare Aug 26, 2025
9f3fe45
Merge branch 'main' into fix-kb-adjustments
erichare Aug 26, 2025
def90ae
Merge branch 'main' into fix-kb-adjustments
edwinjosechittilappilly Aug 26, 2025
29799fe
Update styleUtils.ts
erichare Aug 26, 2025
a050287
Update ingestion.py
erichare Aug 26, 2025
ba11333
Merge branch 'main' into fix-kb-adjustments
carlosrcoelho Aug 26, 2025
9adc5cc
[autofix.ci] apply automated fixes
autofix-ci[bot] Aug 26, 2025
9b90f04
Fix ingestion of df
erichare Aug 26, 2025
95249d7
[autofix.ci] apply automated fixes
autofix-ci[bot] Aug 26, 2025
338b4ce
Update Knowledge Ingestion.json
erichare Aug 26, 2025
5b9d1a8
Fix one failing test
erichare Aug 26, 2025
dd61055
Merge branch 'main' into fix-kb-adjustments
erichare Aug 27, 2025
88d6a4e
Merge branch 'main' into fix-kb-adjustments
erichare Aug 27, 2025
585fc24
Merge branch 'release-1.6.0' into fix-kb-adjustments
erichare Aug 27, 2025
dfb2c4e
[autofix.ci] apply automated fixes
autofix-ci[bot] Aug 27, 2025
4a05cdc
Merge branch 'release-1.6.0' into fix-kb-adjustments
erichare Aug 27, 2025
b512dbb
[autofix.ci] apply automated fixes
autofix-ci[bot] Aug 27, 2025
9bcb694
Revert composio versions for CI
erichare Aug 27, 2025
ad7e5dd
Revert "Revert composio versions for CI"
erichare Aug 27, 2025
00f5ccc
Merge branch 'release-1.6.0' into fix-kb-adjustments
erichare Aug 27, 2025
6727a36
[autofix.ci] apply automated fixes
autofix-ci[bot] Aug 27, 2025
ea4ede2
Update Vector Store RAG.json
erichare Aug 27, 2025
68faa9a
[autofix.ci] apply automated fixes
autofix-ci[bot] Aug 27, 2025
47c5c68
Update starter-projects.spec.ts
erichare Aug 27, 2025
ec30942
Update starter-projects.spec.ts
erichare Aug 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
4 changes: 0 additions & 4 deletions src/backend/base/langflow/components/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
from .directory import DirectoryComponent
from .file import FileComponent
from .json_to_data import JSONToDataComponent
from .kb_ingest import KBIngestionComponent
from .kb_retrieval import KBRetrievalComponent
from .news_search import NewsSearchComponent
from .rss import RSSReaderComponent
from .sql_executor import SQLComponent
Expand All @@ -18,8 +16,6 @@
"DirectoryComponent",
"FileComponent",
"JSONToDataComponent",
"KBIngestionComponent",
"KBRetrievalComponent",
"NewsSearchComponent",
"RSSReaderComponent",
"SQLComponent",
Expand Down
34 changes: 34 additions & 0 deletions src/backend/base/langflow/components/knowledge_bases/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from langflow.components._importing import import_mod

if TYPE_CHECKING:
from langflow.components.knowledge_bases.ingestion import KnowledgeIngestionComponent
from langflow.components.knowledge_bases.retrieval import KnowledgeRetrievalComponent

_dynamic_imports = {
"KnowledgeIngestionComponent": "ingestion",
"KnowledgeRetrievalComponent": "retrieval",
}

__all__ = ["KnowledgeIngestionComponent", "KnowledgeRetrievalComponent"]


def __getattr__(attr_name: str) -> Any:
"""Lazily import input/output components on attribute access."""
if attr_name not in _dynamic_imports:
msg = f"module '{__name__}' has no attribute '{attr_name}'"
raise AttributeError(msg)
try:
result = import_mod(attr_name, _dynamic_imports[attr_name], __spec__.parent)
except (ModuleNotFoundError, ImportError, AttributeError) as e:
msg = f"Could not import '{attr_name}' from '{__name__}': {e}"
raise AttributeError(msg) from e
globals()[attr_name] = result
return result


def __dir__() -> list[str]:
return list(__all__)
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
from langchain_chroma import Chroma
from loguru import logger

from langflow.base.data.kb_utils import get_knowledge_bases
from langflow.base.knowledge_bases.knowledge_base_utils import get_knowledge_bases
from langflow.base.models.openai_constants import OPENAI_EMBEDDING_MODEL_NAMES
from langflow.custom import Component
from langflow.io import BoolInput, DataFrameInput, DropdownInput, IntInput, Output, SecretStrInput, StrInput, TableInput
from langflow.io import BoolInput, DropdownInput, HandleInput, IntInput, Output, SecretStrInput, StrInput, TableInput
from langflow.schema.data import Data
from langflow.schema.dotdict import dotdict # noqa: TC001
from langflow.schema.table import EditMode
Expand All @@ -38,14 +38,14 @@
KNOWLEDGE_BASES_ROOT_PATH = Path(knowledge_directory).expanduser()


class KBIngestionComponent(Component):
class KnowledgeIngestionComponent(Component):
"""Create or append to Langflow Knowledge from a DataFrame."""

# ------ UI metadata ---------------------------------------------------
display_name = "Knowledge Ingestion"
description = "Create or update knowledge in Langflow."
icon = "database"
name = "KBIngestion"
icon = "upload"
name = "KnowledgeIngestion"

def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -101,12 +101,18 @@ class NewKnowledgeBaseInput:
required=True,
options=[],
refresh_button=True,
real_time_refresh=True,
dialog_inputs=asdict(NewKnowledgeBaseInput()),
),
DataFrameInput(
HandleInput(
name="input_df",
display_name="Data",
info="Table with all original columns (already chunked / processed).",
display_name="Input",
info=(
"Table with all original columns (already chunked / processed). "
"Accepts Data or DataFrame. If Data is provided, it is converted to a DataFrame automatically."
),
input_types=["Data", "DataFrame"],
is_list=True,
required=True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Input handling misses list[DataFrame] support; can crash downstream.

The HandleInput advertises input_types ["Data", "DataFrame"] with is_list=True, but build_kb_info only handles Data, list[Data], or single DataFrame. Passing list[DataFrame] leaves df_source as a Python list, breaking validation and ingestion.

Apply this diff to support list[DataFrame] (and mixed Data/DataFrame lists defensively):

@@ async def build_kb_info(self) -> Data:
-            if isinstance(self.input_df, Data):
-                df_source: pd.DataFrame = self.input_df.to_dataframe()
-            elif isinstance(self.input_df, list) and all(isinstance(item, Data) for item in self.input_df):
-                # If input_df is a list of Data objects, concatenate them into a single DataFrame
-                df_source: pd.DataFrame = pd.concat([item.to_dataframe() for item in self.input_df], ignore_index=True)
-            else:
-                df_source: pd.DataFrame = self.input_df
+            if isinstance(self.input_df, Data):
+                df_source: pd.DataFrame = self.input_df.to_dataframe()
+            elif isinstance(self.input_df, list):
+                # Accept lists of Data or DataFrame (or a mix); normalize to DataFrame
+                frames: list[pd.DataFrame] = []
+                for item in self.input_df:
+                    if isinstance(item, Data):
+                        frames.append(item.to_dataframe())
+                    elif isinstance(item, pd.DataFrame):
+                        frames.append(item)
+                    else:
+                        msg = f"Unsupported input type in list: {type(item).__name__}. Expected Data or DataFrame."
+                        raise ValueError(msg)
+                if not frames:
+                    raise ValueError("Empty input list provided for 'input_df'.")
+                df_source = pd.concat(frames, ignore_index=True)
+            elif isinstance(self.input_df, pd.DataFrame):
+                df_source = self.input_df
+            else:
+                msg = f"Unsupported input type for 'input_df': {type(self.input_df).__name__}. Expected Data or DataFrame."
+                raise ValueError(msg)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
HandleInput(
name="input_df",
display_name="Data",
info="Table with all original columns (already chunked / processed).",
display_name="Input",
info=(
"Table with all original columns (already chunked / processed). "
"Accepts Data or DataFrame. If Data is provided, it is converted to a DataFrame automatically."
),
input_types=["Data", "DataFrame"],
is_list=True,
required=True,
async def build_kb_info(self) -> Data:
# … previous logic …
if isinstance(self.input_df, Data):
df_source: pd.DataFrame = self.input_df.to_dataframe()
elif isinstance(self.input_df, list):
# Accept lists of Data or DataFrame (or a mix); normalize to a single DataFrame
frames: list[pd.DataFrame] = []
for item in self.input_df:
if isinstance(item, Data):
frames.append(item.to_dataframe())
elif isinstance(item, pd.DataFrame):
frames.append(item)
else:
msg = (
f"Unsupported input type in list: {type(item).__name__}. "
"Expected Data or DataFrame."
)
raise ValueError(msg)
if not frames:
raise ValueError("Empty input list provided for 'input_df'.")
df_source = pd.concat(frames, ignore_index=True)
elif isinstance(self.input_df, pd.DataFrame):
df_source = self.input_df
else:
msg = (
f"Unsupported input type for 'input_df': {type(self.input_df).__name__}. "
"Expected Data or DataFrame."
)
raise ValueError(msg)
# … subsequent logic …
🤖 Prompt for AI Agents
In src/backend/base/langflow/components/knowledge_bases/ingestion.py around
lines 107-116, build_kb_info currently accepts ["Data","DataFrame"] with
is_list=True but only handles list[Data] or single DataFrame; this misses
list[DataFrame] and mixed lists causing df_source to remain a raw Python list.
Fix by normalizing incoming values: if input is a list, iterate elements and
classify each element as Data (leave as-is) or DataFrame (convert to Data by
records dicts or collect DataFrames); if all elements are DataFrames, concat
them into a single DataFrame and proceed down the single-DataFrame branch;
otherwise convert DataFrame elements to Data and treat the whole list as
list[Data] so existing list-Data handling works; add defensive type checks and
clear error messages for unsupported element types.

),
TableInput(
Expand Down Expand Up @@ -504,7 +510,13 @@ async def build_kb_info(self) -> Data:
"""Main ingestion routine → returns a dict with KB metadata."""
try:
# Get source DataFrame
df_source: pd.DataFrame = self.input_df
if isinstance(self.input_df, Data):
df_source: pd.DataFrame = self.input_df.to_dataframe()
elif isinstance(self.input_df, list) and all(isinstance(item, Data) for item in self.input_df):
# If input_df is a list of Data objects, concatenate them into a single DataFrame
df_source: pd.DataFrame = pd.concat([item.to_dataframe() for item in self.input_df], ignore_index=True)
else:
df_source: pd.DataFrame = self.input_df

# Validate column configuration (using Structured Output patterns)
config_list = self._validate_column_config(df_source)
Expand Down Expand Up @@ -559,9 +571,8 @@ async def build_kb_info(self) -> Data:
return Data(data=meta)

except (OSError, ValueError, RuntimeError, KeyError) as e:
self.log(f"Error in KB ingestion: {e}")
self.status = f"❌ KB ingestion failed: {e}"
return Data(data={"error": str(e), "kb_name": self.knowledge_base})
msg = f"Error during KB ingestion: {e}"
raise RuntimeError(msg) from e

async def _get_api_key_variable(self, field_value: dict[str, Any]):
async with session_scope() as db:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from loguru import logger
from pydantic import SecretStr

from langflow.base.data.kb_utils import get_knowledge_bases
from langflow.base.knowledge_bases.knowledge_base_utils import get_knowledge_bases
from langflow.custom import Component
from langflow.io import BoolInput, DropdownInput, IntInput, MessageTextInput, Output, SecretStrInput
from langflow.schema.data import Data
Expand All @@ -24,11 +24,11 @@
KNOWLEDGE_BASES_ROOT_PATH = Path(knowledge_directory).expanduser()


class KBRetrievalComponent(Component):
class KnowledgeRetrievalComponent(Component):
display_name = "Knowledge Retrieval"
description = "Search and retrieve data from knowledge."
icon = "database"
name = "KBRetrieval"
icon = "download"
name = "KnowledgeRetrieval"

inputs = [
DropdownInput(
Expand All @@ -51,6 +51,7 @@ class KBRetrievalComponent(Component):
name="search_query",
display_name="Search Query",
info="Optional search query to filter knowledge base data.",
tool_mode=True,
),
IntInput(
name="top_k",
Expand All @@ -63,17 +64,24 @@ class KBRetrievalComponent(Component):
BoolInput(
name="include_metadata",
display_name="Include Metadata",
info="Whether to include all metadata and embeddings in the output. If false, only content is returned.",
info="Whether to include all metadata in the output. If false, only content is returned.",
value=True,
advanced=False,
),
BoolInput(
name="include_embeddings",
display_name="Include Embeddings",
info="Whether to include embeddings in the output. Only applicable if 'Include Metadata' is enabled.",
value=False,
advanced=True,
),
]

outputs = [
Output(
name="chroma_kb_data",
name="retrieve_data",
display_name="Results",
method="get_chroma_kb_data",
method="retrieve_data",
info="Returns the data from the selected knowledge base.",
),
]
Expand Down Expand Up @@ -162,7 +170,7 @@ def _build_embeddings(self, metadata: dict):
msg = f"Embedding provider '{provider}' is not supported for retrieval."
raise NotImplementedError(msg)

async def get_chroma_kb_data(self) -> DataFrame:
async def retrieve_data(self) -> DataFrame:
"""Retrieve data from the selected knowledge base by reading the Chroma collection.

Returns:
Expand Down Expand Up @@ -212,16 +220,16 @@ async def get_chroma_kb_data(self) -> DataFrame:
# For each result, make it a tuple to match the expected output format
results = [(doc, 0) for doc in results] # Assign a dummy score of 0

# If metadata is enabled, get embeddings for the results
# If include_embeddings is enabled, get embeddings for the results
id_to_embedding = {}
if self.include_metadata and results:
if self.include_embeddings and results:
doc_ids = [doc[0].metadata.get("_id") for doc in results if doc[0].metadata.get("_id")]

# Only proceed if we have valid document IDs
if doc_ids:
# Access underlying client to get embeddings
collection = chroma._client.get_collection(name=self.knowledge_base)
embeddings_result = collection.get(where={"_id": {"$in": doc_ids}}, include=["embeddings", "metadatas"])
embeddings_result = collection.get(where={"_id": {"$in": doc_ids}}, include=["metadatas", "embeddings"])

# Create a mapping from document ID to embedding
for i, metadata in enumerate(embeddings_result.get("metadatas", [])):
Expand All @@ -231,20 +239,16 @@ async def get_chroma_kb_data(self) -> DataFrame:
# Build output data based on include_metadata setting
data_list = []
for doc in results:
kwargs = {
"content": doc[0].page_content,
}
if self.search_query:
kwargs["_score"] = -1 * doc[1]
if self.include_metadata:
# Include all metadata, embeddings, and content
kwargs = {
"content": doc[0].page_content,
**doc[0].metadata,
}
if self.search_query:
kwargs["_score"] = -1 * doc[1]
kwargs.update(doc[0].metadata)
if self.include_embeddings:
kwargs["_embeddings"] = id_to_embedding.get(doc[0].metadata.get("_id"))
else:
# Only include content
kwargs = {
"content": doc[0].page_content,
}

data_list.append(Data(**kwargs))

Expand Down
Loading
Loading