diff --git a/src/backend/base/langflow/base/knowledge_bases/__init__.py b/src/backend/base/langflow/base/knowledge_bases/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/backend/base/langflow/base/data/kb_utils.py b/src/backend/base/langflow/base/knowledge_bases/knowledge_base_utils.py similarity index 100% rename from src/backend/base/langflow/base/data/kb_utils.py rename to src/backend/base/langflow/base/knowledge_bases/knowledge_base_utils.py diff --git a/src/backend/base/langflow/components/data/__init__.py b/src/backend/base/langflow/components/data/__init__.py index 4f589c37f974..6e90f042685e 100644 --- a/src/backend/base/langflow/components/data/__init__.py +++ b/src/backend/base/langflow/components/data/__init__.py @@ -3,8 +3,6 @@ from .directory import DirectoryComponent from .file import FileComponent from .json_to_data import JSONToDataComponent -from .kb_ingest import KBIngestionComponent -from .kb_retrieval import KBRetrievalComponent from .news_search import NewsSearchComponent from .rss import RSSReaderComponent from .sql_executor import SQLComponent @@ -18,8 +16,6 @@ "DirectoryComponent", "FileComponent", "JSONToDataComponent", - "KBIngestionComponent", - "KBRetrievalComponent", "NewsSearchComponent", "RSSReaderComponent", "SQLComponent", diff --git a/src/backend/base/langflow/components/knowledge_bases/__init__.py b/src/backend/base/langflow/components/knowledge_bases/__init__.py new file mode 100644 index 000000000000..b14f3c5c3b75 --- /dev/null +++ b/src/backend/base/langflow/components/knowledge_bases/__init__.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from langflow.components._importing import import_mod + +if TYPE_CHECKING: + from langflow.components.knowledge_bases.ingestion import KnowledgeIngestionComponent + from langflow.components.knowledge_bases.retrieval import KnowledgeRetrievalComponent + +_dynamic_imports = { + "KnowledgeIngestionComponent": "ingestion", + "KnowledgeRetrievalComponent": "retrieval", +} + +__all__ = ["KnowledgeIngestionComponent", "KnowledgeRetrievalComponent"] + + +def __getattr__(attr_name: str) -> Any: + """Lazily import input/output components on attribute access.""" + if attr_name not in _dynamic_imports: + msg = f"module '{__name__}' has no attribute '{attr_name}'" + raise AttributeError(msg) + try: + result = import_mod(attr_name, _dynamic_imports[attr_name], __spec__.parent) + except (ModuleNotFoundError, ImportError, AttributeError) as e: + msg = f"Could not import '{attr_name}' from '{__name__}': {e}" + raise AttributeError(msg) from e + globals()[attr_name] = result + return result + + +def __dir__() -> list[str]: + return list(__all__) diff --git a/src/backend/base/langflow/components/data/kb_ingest.py b/src/backend/base/langflow/components/knowledge_bases/ingestion.py similarity index 95% rename from src/backend/base/langflow/components/data/kb_ingest.py rename to src/backend/base/langflow/components/knowledge_bases/ingestion.py index 4a734ba81a2a..7a2078a8e231 100644 --- a/src/backend/base/langflow/components/data/kb_ingest.py +++ b/src/backend/base/langflow/components/knowledge_bases/ingestion.py @@ -9,17 +9,18 @@ from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any import pandas as pd from cryptography.fernet import InvalidToken from langchain_chroma import Chroma from loguru import logger -from langflow.base.data.kb_utils import get_knowledge_bases +from langflow.base.knowledge_bases.knowledge_base_utils import get_knowledge_bases from langflow.base.models.openai_constants import OPENAI_EMBEDDING_MODEL_NAMES +from langflow.components.processing.converter import convert_to_dataframe from langflow.custom import Component -from langflow.io import BoolInput, DataFrameInput, DropdownInput, IntInput, Output, SecretStrInput, StrInput, TableInput +from langflow.io import BoolInput, DropdownInput, HandleInput, IntInput, Output, SecretStrInput, StrInput, TableInput from langflow.schema.data import Data from langflow.schema.dotdict import dotdict # noqa: TC001 from langflow.schema.table import EditMode @@ -27,6 +28,9 @@ from langflow.services.database.models.user.crud import get_user_by_id from langflow.services.deps import get_settings_service, get_variable_service, session_scope +if TYPE_CHECKING: + from langflow.schema.dataframe import DataFrame + HUGGINGFACE_MODEL_NAMES = ["sentence-transformers/all-MiniLM-L6-v2", "sentence-transformers/all-mpnet-base-v2"] COHERE_MODEL_NAMES = ["embed-english-v3.0", "embed-multilingual-v3.0"] @@ -38,14 +42,14 @@ KNOWLEDGE_BASES_ROOT_PATH = Path(knowledge_directory).expanduser() -class KBIngestionComponent(Component): +class KnowledgeIngestionComponent(Component): """Create or append to Langflow Knowledge from a DataFrame.""" # ------ UI metadata --------------------------------------------------- display_name = "Knowledge Ingestion" description = "Create or update knowledge in Langflow." - icon = "database" - name = "KBIngestion" + icon = "upload" + name = "KnowledgeIngestion" def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) @@ -101,12 +105,17 @@ class NewKnowledgeBaseInput: required=True, options=[], refresh_button=True, + real_time_refresh=True, dialog_inputs=asdict(NewKnowledgeBaseInput()), ), - DataFrameInput( + HandleInput( name="input_df", - display_name="Data", - info="Table with all original columns (already chunked / processed).", + display_name="Input", + info=( + "Table with all original columns (already chunked / processed). " + "Accepts Data or DataFrame. If Data is provided, it is converted to a DataFrame automatically." + ), + input_types=["Data", "DataFrame"], required=True, ), TableInput( @@ -171,7 +180,7 @@ class NewKnowledgeBaseInput: ] # ------ Outputs ------------------------------------------------------- - outputs = [Output(display_name="DataFrame", name="dataframe", method="build_kb_info")] + outputs = [Output(display_name="Results", name="dataframe_output", method="build_kb_info")] # ------ Internal helpers --------------------------------------------- def _get_kb_root(self) -> Path: @@ -503,8 +512,8 @@ async def _kb_path(self) -> Path | None: async def build_kb_info(self) -> Data: """Main ingestion routine → returns a dict with KB metadata.""" try: - # Get source DataFrame - df_source: pd.DataFrame = self.input_df + input_value = self.input_df[0] if isinstance(self.input_df, list) else self.input_df + df_source: DataFrame = convert_to_dataframe(input_value) # Validate column configuration (using Structured Output patterns) config_list = self._validate_column_config(df_source) @@ -559,9 +568,8 @@ async def build_kb_info(self) -> Data: return Data(data=meta) except (OSError, ValueError, RuntimeError, KeyError) as e: - self.log(f"Error in KB ingestion: {e}") - self.status = f"❌ KB ingestion failed: {e}" - return Data(data={"error": str(e), "kb_name": self.knowledge_base}) + msg = f"Error during KB ingestion: {e}" + raise RuntimeError(msg) from e async def _get_api_key_variable(self, field_value: dict[str, Any]): async with session_scope() as db: diff --git a/src/backend/base/langflow/components/data/kb_retrieval.py b/src/backend/base/langflow/components/knowledge_bases/retrieval.py similarity index 88% rename from src/backend/base/langflow/components/data/kb_retrieval.py rename to src/backend/base/langflow/components/knowledge_bases/retrieval.py index d633c62b0404..6c5fa387393a 100644 --- a/src/backend/base/langflow/components/data/kb_retrieval.py +++ b/src/backend/base/langflow/components/knowledge_bases/retrieval.py @@ -7,7 +7,7 @@ from loguru import logger from pydantic import SecretStr -from langflow.base.data.kb_utils import get_knowledge_bases +from langflow.base.knowledge_bases.knowledge_base_utils import get_knowledge_bases from langflow.custom import Component from langflow.io import BoolInput, DropdownInput, IntInput, MessageTextInput, Output, SecretStrInput from langflow.schema.data import Data @@ -24,11 +24,11 @@ KNOWLEDGE_BASES_ROOT_PATH = Path(knowledge_directory).expanduser() -class KBRetrievalComponent(Component): +class KnowledgeRetrievalComponent(Component): display_name = "Knowledge Retrieval" description = "Search and retrieve data from knowledge." - icon = "database" - name = "KBRetrieval" + icon = "download" + name = "KnowledgeRetrieval" inputs = [ DropdownInput( @@ -51,6 +51,7 @@ class KBRetrievalComponent(Component): name="search_query", display_name="Search Query", info="Optional search query to filter knowledge base data.", + tool_mode=True, ), IntInput( name="top_k", @@ -63,17 +64,24 @@ class KBRetrievalComponent(Component): BoolInput( name="include_metadata", display_name="Include Metadata", - info="Whether to include all metadata and embeddings in the output. If false, only content is returned.", + info="Whether to include all metadata in the output. If false, only content is returned.", value=True, advanced=False, ), + BoolInput( + name="include_embeddings", + display_name="Include Embeddings", + info="Whether to include embeddings in the output. Only applicable if 'Include Metadata' is enabled.", + value=False, + advanced=True, + ), ] outputs = [ Output( - name="chroma_kb_data", + name="retrieve_data", display_name="Results", - method="get_chroma_kb_data", + method="retrieve_data", info="Returns the data from the selected knowledge base.", ), ] @@ -162,7 +170,7 @@ def _build_embeddings(self, metadata: dict): msg = f"Embedding provider '{provider}' is not supported for retrieval." raise NotImplementedError(msg) - async def get_chroma_kb_data(self) -> DataFrame: + async def retrieve_data(self) -> DataFrame: """Retrieve data from the selected knowledge base by reading the Chroma collection. Returns: @@ -212,16 +220,16 @@ async def get_chroma_kb_data(self) -> DataFrame: # For each result, make it a tuple to match the expected output format results = [(doc, 0) for doc in results] # Assign a dummy score of 0 - # If metadata is enabled, get embeddings for the results + # If include_embeddings is enabled, get embeddings for the results id_to_embedding = {} - if self.include_metadata and results: + if self.include_embeddings and results: doc_ids = [doc[0].metadata.get("_id") for doc in results if doc[0].metadata.get("_id")] # Only proceed if we have valid document IDs if doc_ids: # Access underlying client to get embeddings collection = chroma._client.get_collection(name=self.knowledge_base) - embeddings_result = collection.get(where={"_id": {"$in": doc_ids}}, include=["embeddings", "metadatas"]) + embeddings_result = collection.get(where={"_id": {"$in": doc_ids}}, include=["metadatas", "embeddings"]) # Create a mapping from document ID to embedding for i, metadata in enumerate(embeddings_result.get("metadatas", [])): @@ -231,20 +239,16 @@ async def get_chroma_kb_data(self) -> DataFrame: # Build output data based on include_metadata setting data_list = [] for doc in results: + kwargs = { + "content": doc[0].page_content, + } + if self.search_query: + kwargs["_score"] = -1 * doc[1] if self.include_metadata: # Include all metadata, embeddings, and content - kwargs = { - "content": doc[0].page_content, - **doc[0].metadata, - } - if self.search_query: - kwargs["_score"] = -1 * doc[1] + kwargs.update(doc[0].metadata) + if self.include_embeddings: kwargs["_embeddings"] = id_to_embedding.get(doc[0].metadata.get("_id")) - else: - # Only include content - kwargs = { - "content": doc[0].page_content, - } data_list.append(Data(**kwargs)) diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Hybrid Search RAG.json b/src/backend/base/langflow/initial_setup/starter_projects/Hybrid Search RAG.json index 7cae914856ad..238c6ae156dd 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Hybrid Search RAG.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Hybrid Search RAG.json @@ -2207,7 +2207,7 @@ }, { "name": "langchain_core", - "version": "0.3.72" + "version": "0.3.75" }, { "name": "langflow", diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Knowledge Ingestion.json b/src/backend/base/langflow/initial_setup/starter_projects/Knowledge Ingestion.json index c4ea590da86d..bdce3a82f9a5 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Knowledge Ingestion.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Knowledge Ingestion.json @@ -7,7 +7,7 @@ "data": { "sourceHandle": { "dataType": "URLComponent", - "id": "URLComponent-6JEUC", + "id": "URLComponent-WBxJx", "name": "page_results", "output_types": [ "DataFrame" @@ -15,7 +15,7 @@ }, "targetHandle": { "fieldName": "data_inputs", - "id": "SplitText-gvHe2", + "id": "SplitText-edq28", "inputTypes": [ "Data", "DataFrame", @@ -24,20 +24,19 @@ "type": "other" } }, - "id": "reactflow__edge-URLComponent-6JEUC{œdataTypeœ:œURLComponentœ,œidœ:œURLComponent-6JEUCœ,œnameœ:œpage_resultsœ,œoutput_typesœ:[œDataFrameœ]}-SplitText-gvHe2{œfieldNameœ:œdata_inputsœ,œidœ:œSplitText-gvHe2œ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}", + "id": "reactflow__edge-URLComponent-WBxJx{œdataTypeœ:œURLComponentœ,œidœ:œURLComponent-WBxJxœ,œnameœ:œpage_resultsœ,œoutput_typesœ:[œDataFrameœ]}-SplitText-edq28{œfieldNameœ:œdata_inputsœ,œidœ:œSplitText-edq28œ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}", "selected": false, - "source": "URLComponent-6JEUC", - "sourceHandle": "{œdataTypeœ: œURLComponentœ, œidœ: œURLComponent-6JEUCœ, œnameœ: œpage_resultsœ, œoutput_typesœ: [œDataFrameœ]}", - "target": "SplitText-gvHe2", - "targetHandle": "{œfieldNameœ: œdata_inputsœ, œidœ: œSplitText-gvHe2œ, œinputTypesœ: [œDataœ, œDataFrameœ, œMessageœ], œtypeœ: œotherœ}" + "source": "URLComponent-WBxJx", + "sourceHandle": "{œdataTypeœ: œURLComponentœ, œidœ: œURLComponent-WBxJxœ, œnameœ: œpage_resultsœ, œoutput_typesœ: [œDataFrameœ]}", + "target": "SplitText-edq28", + "targetHandle": "{œfieldNameœ: œdata_inputsœ, œidœ: œSplitText-edq28œ, œinputTypesœ: [œDataœ, œDataFrameœ, œMessageœ], œtypeœ: œotherœ}" }, { - "animated": false, "className": "", "data": { "sourceHandle": { "dataType": "SplitText", - "id": "SplitText-gvHe2", + "id": "SplitText-edq28", "name": "dataframe", "output_types": [ "DataFrame" @@ -45,25 +44,25 @@ }, "targetHandle": { "fieldName": "input_df", - "id": "KBIngestion-jj5iW", + "id": "KnowledgeIngestion-uSOy6", "inputTypes": [ + "Data", "DataFrame" ], "type": "other" } }, - "id": "xy-edge__SplitText-gvHe2{œdataTypeœ:œSplitTextœ,œidœ:œSplitText-gvHe2œ,œnameœ:œdataframeœ,œoutput_typesœ:[œDataFrameœ]}-KBIngestion-jj5iW{œfieldNameœ:œinput_dfœ,œidœ:œKBIngestion-jj5iWœ,œinputTypesœ:[œDataFrameœ],œtypeœ:œotherœ}", - "selected": false, - "source": "SplitText-gvHe2", - "sourceHandle": "{œdataTypeœ: œSplitTextœ, œidœ: œSplitText-gvHe2œ, œnameœ: œdataframeœ, œoutput_typesœ: [œDataFrameœ]}", - "target": "KBIngestion-jj5iW", - "targetHandle": "{œfieldNameœ: œinput_dfœ, œidœ: œKBIngestion-jj5iWœ, œinputTypesœ: [œDataFrameœ], œtypeœ: œotherœ}" + "id": "xy-edge__SplitText-edq28{œdataTypeœ:œSplitTextœ,œidœ:œSplitText-edq28œ,œnameœ:œdataframeœ,œoutput_typesœ:[œDataFrameœ]}-KnowledgeIngestion-uSOy6{œfieldNameœ:œinput_dfœ,œidœ:œKnowledgeIngestion-uSOy6œ,œinputTypesœ:[œDataœ,œDataFrameœ],œtypeœ:œotherœ}", + "source": "SplitText-edq28", + "sourceHandle": "{œdataTypeœ: œSplitTextœ, œidœ: œSplitText-edq28œ, œnameœ: œdataframeœ, œoutput_typesœ: [œDataFrameœ]}", + "target": "KnowledgeIngestion-uSOy6", + "targetHandle": "{œfieldNameœ: œinput_dfœ, œidœ: œKnowledgeIngestion-uSOy6œ, œinputTypesœ: [œDataœ, œDataFrameœ], œtypeœ: œotherœ}" } ], "nodes": [ { "data": { - "id": "SplitText-gvHe2", + "id": "SplitText-edq28", "node": { "base_classes": [ "DataFrame" @@ -280,7 +279,7 @@ "type": "SplitText" }, "dragging": false, - "id": "SplitText-gvHe2", + "id": "SplitText-edq28", "measured": { "height": 413, "width": 320 @@ -294,7 +293,7 @@ }, { "data": { - "id": "note-bpWz8", + "id": "note-httIY", "node": { "description": "## Knowledge Ingestion\n\nThis flow shows the basics of the creation and ingestion of knowledge bases in Langflow. Here we use the `URL` component to dynamically fetch page data from the Langflow website, split it into chunks of 100 tokens, then ingest into a Knowledge Base.\n\n1. (Optional) Change the URL or switch to a different input data source as desired.\n2. (Optional) Adjust the Chunk Size as desired.\n3. Select or Create a new knowledge base.\n4. Ensure the column you wish to Vectorize is properly reflected in the Column Configuration table.", "display_name": "", @@ -305,7 +304,7 @@ }, "dragging": false, "height": 401, - "id": "note-bpWz8", + "id": "note-httIY", "measured": { "height": 401, "width": 388 @@ -315,13 +314,13 @@ "y": 75.97023827444744 }, "resizing": false, - "selected": true, + "selected": false, "type": "noteNode", "width": 388 }, { "data": { - "id": "URLComponent-6JEUC", + "id": "URLComponent-WBxJx", "node": { "base_classes": [ "DataFrame", @@ -697,7 +696,7 @@ "type": "URLComponent" }, "dragging": false, - "id": "URLComponent-6JEUC", + "id": "URLComponent-WBxJx", "measured": { "height": 292, "width": 320 @@ -711,7 +710,7 @@ }, { "data": { - "id": "KBIngestion-jj5iW", + "id": "KnowledgeIngestion-uSOy6", "node": { "base_classes": [ "Data" @@ -732,11 +731,11 @@ "allow_duplicates" ], "frozen": false, - "icon": "database", - "last_updated": "2025-08-13T19:45:49.122Z", + "icon": "upload", + "last_updated": "2025-08-26T18:21:03.358Z", "legacy": false, "metadata": { - "code_hash": "6c62063f2c09", + "code_hash": "ce9549373934", "dependencies": { "dependencies": [ { @@ -774,7 +773,7 @@ ], "total_dependencies": 8 }, - "module": "langflow.components.data.kb_ingest.KBIngestionComponent" + "module": "langflow.components.knowledge_bases.ingestion.KnowledgeIngestionComponent" }, "minimized": false, "output_types": [], @@ -782,10 +781,10 @@ { "allows_loop": false, "cache": true, - "display_name": "DataFrame", + "display_name": "Results", "group_outputs": false, "method": "build_kb_info", - "name": "dataframe", + "name": "dataframe_output", "selected": "Data", "tool_mode": true, "types": [ @@ -866,7 +865,7 @@ "show": true, "title_case": false, "type": "code", - "value": "from __future__ import annotations\n\nimport asyncio\nimport contextlib\nimport hashlib\nimport json\nimport re\nimport uuid\nfrom dataclasses import asdict, dataclass, field\nfrom datetime import datetime, timezone\nfrom pathlib import Path\nfrom typing import Any\n\nimport pandas as pd\nfrom cryptography.fernet import InvalidToken\nfrom langchain_chroma import Chroma\nfrom loguru import logger\n\nfrom langflow.base.data.kb_utils import get_knowledge_bases\nfrom langflow.base.models.openai_constants import OPENAI_EMBEDDING_MODEL_NAMES\nfrom langflow.custom import Component\nfrom langflow.io import BoolInput, DataFrameInput, DropdownInput, IntInput, Output, SecretStrInput, StrInput, TableInput\nfrom langflow.schema.data import Data\nfrom langflow.schema.dotdict import dotdict # noqa: TC001\nfrom langflow.schema.table import EditMode\nfrom langflow.services.auth.utils import decrypt_api_key, encrypt_api_key\nfrom langflow.services.database.models.user.crud import get_user_by_id\nfrom langflow.services.deps import get_settings_service, get_variable_service, session_scope\n\nHUGGINGFACE_MODEL_NAMES = [\"sentence-transformers/all-MiniLM-L6-v2\", \"sentence-transformers/all-mpnet-base-v2\"]\nCOHERE_MODEL_NAMES = [\"embed-english-v3.0\", \"embed-multilingual-v3.0\"]\n\nsettings = get_settings_service().settings\nknowledge_directory = settings.knowledge_bases_dir\nif not knowledge_directory:\n msg = \"Knowledge bases directory is not set in the settings.\"\n raise ValueError(msg)\nKNOWLEDGE_BASES_ROOT_PATH = Path(knowledge_directory).expanduser()\n\n\nclass KBIngestionComponent(Component):\n \"\"\"Create or append to Langflow Knowledge from a DataFrame.\"\"\"\n\n # ------ UI metadata ---------------------------------------------------\n display_name = \"Knowledge Ingestion\"\n description = \"Create or update knowledge in Langflow.\"\n icon = \"database\"\n name = \"KBIngestion\"\n\n def __init__(self, *args, **kwargs) -> None:\n super().__init__(*args, **kwargs)\n self._cached_kb_path: Path | None = None\n\n @dataclass\n class NewKnowledgeBaseInput:\n functionality: str = \"create\"\n fields: dict[str, dict] = field(\n default_factory=lambda: {\n \"data\": {\n \"node\": {\n \"name\": \"create_knowledge_base\",\n \"description\": \"Create new knowledge in Langflow.\",\n \"display_name\": \"Create new knowledge\",\n \"field_order\": [\"01_new_kb_name\", \"02_embedding_model\", \"03_api_key\"],\n \"template\": {\n \"01_new_kb_name\": StrInput(\n name=\"new_kb_name\",\n display_name=\"Knowledge Name\",\n info=\"Name of the new knowledge to create.\",\n required=True,\n ),\n \"02_embedding_model\": DropdownInput(\n name=\"embedding_model\",\n display_name=\"Model Name\",\n info=\"Select the embedding model to use for this knowledge base.\",\n required=True,\n options=OPENAI_EMBEDDING_MODEL_NAMES + HUGGINGFACE_MODEL_NAMES + COHERE_MODEL_NAMES,\n options_metadata=[{\"icon\": \"OpenAI\"} for _ in OPENAI_EMBEDDING_MODEL_NAMES]\n + [{\"icon\": \"HuggingFace\"} for _ in HUGGINGFACE_MODEL_NAMES]\n + [{\"icon\": \"Cohere\"} for _ in COHERE_MODEL_NAMES],\n ),\n \"03_api_key\": SecretStrInput(\n name=\"api_key\",\n display_name=\"API Key\",\n info=\"Provider API key for embedding model\",\n required=True,\n load_from_db=False,\n ),\n },\n },\n }\n }\n )\n\n # ------ Inputs --------------------------------------------------------\n inputs = [\n DropdownInput(\n name=\"knowledge_base\",\n display_name=\"Knowledge\",\n info=\"Select the knowledge to load data from.\",\n required=True,\n options=[],\n refresh_button=True,\n dialog_inputs=asdict(NewKnowledgeBaseInput()),\n ),\n DataFrameInput(\n name=\"input_df\",\n display_name=\"Data\",\n info=\"Table with all original columns (already chunked / processed).\",\n required=True,\n ),\n TableInput(\n name=\"column_config\",\n display_name=\"Column Configuration\",\n info=\"Configure column behavior for the knowledge base.\",\n required=True,\n table_schema=[\n {\n \"name\": \"column_name\",\n \"display_name\": \"Column Name\",\n \"type\": \"str\",\n \"description\": \"Name of the column in the source DataFrame\",\n \"edit_mode\": EditMode.INLINE,\n },\n {\n \"name\": \"vectorize\",\n \"display_name\": \"Vectorize\",\n \"type\": \"boolean\",\n \"description\": \"Create embeddings for this column\",\n \"default\": False,\n \"edit_mode\": EditMode.INLINE,\n },\n {\n \"name\": \"identifier\",\n \"display_name\": \"Identifier\",\n \"type\": \"boolean\",\n \"description\": \"Use this column as unique identifier\",\n \"default\": False,\n \"edit_mode\": EditMode.INLINE,\n },\n ],\n value=[\n {\n \"column_name\": \"text\",\n \"vectorize\": True,\n \"identifier\": True,\n },\n ],\n ),\n IntInput(\n name=\"chunk_size\",\n display_name=\"Chunk Size\",\n info=\"Batch size for processing embeddings\",\n advanced=True,\n value=1000,\n ),\n SecretStrInput(\n name=\"api_key\",\n display_name=\"Embedding Provider API Key\",\n info=\"API key for the embedding provider to generate embeddings.\",\n advanced=True,\n required=False,\n ),\n BoolInput(\n name=\"allow_duplicates\",\n display_name=\"Allow Duplicates\",\n info=\"Allow duplicate rows in the knowledge base\",\n advanced=True,\n value=False,\n ),\n ]\n\n # ------ Outputs -------------------------------------------------------\n outputs = [Output(display_name=\"DataFrame\", name=\"dataframe\", method=\"build_kb_info\")]\n\n # ------ Internal helpers ---------------------------------------------\n def _get_kb_root(self) -> Path:\n \"\"\"Return the root directory for knowledge bases.\"\"\"\n return KNOWLEDGE_BASES_ROOT_PATH\n\n def _validate_column_config(self, df_source: pd.DataFrame) -> list[dict[str, Any]]:\n \"\"\"Validate column configuration using Structured Output patterns.\"\"\"\n if not self.column_config:\n msg = \"Column configuration cannot be empty\"\n raise ValueError(msg)\n\n # Convert table input to list of dicts (similar to Structured Output)\n config_list = self.column_config if isinstance(self.column_config, list) else []\n\n # Validate column names exist in DataFrame\n df_columns = set(df_source.columns)\n for config in config_list:\n col_name = config.get(\"column_name\")\n if col_name not in df_columns:\n msg = f\"Column '{col_name}' not found in DataFrame. Available columns: {sorted(df_columns)}\"\n raise ValueError(msg)\n\n return config_list\n\n def _get_embedding_provider(self, embedding_model: str) -> str:\n \"\"\"Get embedding provider by matching model name to lists.\"\"\"\n if embedding_model in OPENAI_EMBEDDING_MODEL_NAMES:\n return \"OpenAI\"\n if embedding_model in HUGGINGFACE_MODEL_NAMES:\n return \"HuggingFace\"\n if embedding_model in COHERE_MODEL_NAMES:\n return \"Cohere\"\n return \"Custom\"\n\n def _build_embeddings(self, embedding_model: str, api_key: str):\n \"\"\"Build embedding model using provider patterns.\"\"\"\n # Get provider by matching model name to lists\n provider = self._get_embedding_provider(embedding_model)\n\n # Validate provider and model\n if provider == \"OpenAI\":\n from langchain_openai import OpenAIEmbeddings\n\n if not api_key:\n msg = \"OpenAI API key is required when using OpenAI provider\"\n raise ValueError(msg)\n return OpenAIEmbeddings(\n model=embedding_model,\n api_key=api_key,\n chunk_size=self.chunk_size,\n )\n if provider == \"HuggingFace\":\n from langchain_huggingface import HuggingFaceEmbeddings\n\n return HuggingFaceEmbeddings(\n model=embedding_model,\n )\n if provider == \"Cohere\":\n from langchain_cohere import CohereEmbeddings\n\n if not api_key:\n msg = \"Cohere API key is required when using Cohere provider\"\n raise ValueError(msg)\n return CohereEmbeddings(\n model=embedding_model,\n cohere_api_key=api_key,\n )\n if provider == \"Custom\":\n # For custom embedding models, we would need additional configuration\n msg = \"Custom embedding models not yet supported\"\n raise NotImplementedError(msg)\n msg = f\"Unknown provider: {provider}\"\n raise ValueError(msg)\n\n def _build_embedding_metadata(self, embedding_model, api_key) -> dict[str, Any]:\n \"\"\"Build embedding model metadata.\"\"\"\n # Get provider by matching model name to lists\n embedding_provider = self._get_embedding_provider(embedding_model)\n\n api_key_to_save = None\n if api_key and hasattr(api_key, \"get_secret_value\"):\n api_key_to_save = api_key.get_secret_value()\n elif isinstance(api_key, str):\n api_key_to_save = api_key\n\n encrypted_api_key = None\n if api_key_to_save:\n settings_service = get_settings_service()\n try:\n encrypted_api_key = encrypt_api_key(api_key_to_save, settings_service=settings_service)\n except (TypeError, ValueError) as e:\n self.log(f\"Could not encrypt API key: {e}\")\n logger.error(f\"Could not encrypt API key: {e}\")\n\n return {\n \"embedding_provider\": embedding_provider,\n \"embedding_model\": embedding_model,\n \"api_key\": encrypted_api_key,\n \"api_key_used\": bool(api_key),\n \"chunk_size\": self.chunk_size,\n \"created_at\": datetime.now(timezone.utc).isoformat(),\n }\n\n def _save_embedding_metadata(self, kb_path: Path, embedding_model: str, api_key: str) -> None:\n \"\"\"Save embedding model metadata.\"\"\"\n embedding_metadata = self._build_embedding_metadata(embedding_model, api_key)\n metadata_path = kb_path / \"embedding_metadata.json\"\n metadata_path.write_text(json.dumps(embedding_metadata, indent=2))\n\n def _save_kb_files(\n self,\n kb_path: Path,\n config_list: list[dict[str, Any]],\n ) -> None:\n \"\"\"Save KB files using File Component storage patterns.\"\"\"\n try:\n # Create directory (following File Component patterns)\n kb_path.mkdir(parents=True, exist_ok=True)\n\n # Save column configuration\n # Only do this if the file doesn't exist already\n cfg_path = kb_path / \"schema.json\"\n if not cfg_path.exists():\n cfg_path.write_text(json.dumps(config_list, indent=2))\n\n except (OSError, TypeError, ValueError) as e:\n self.log(f\"Error saving KB files: {e}\")\n\n def _build_column_metadata(self, config_list: list[dict[str, Any]], df_source: pd.DataFrame) -> dict[str, Any]:\n \"\"\"Build detailed column metadata.\"\"\"\n metadata: dict[str, Any] = {\n \"total_columns\": len(df_source.columns),\n \"mapped_columns\": len(config_list),\n \"unmapped_columns\": len(df_source.columns) - len(config_list),\n \"columns\": [],\n \"summary\": {\"vectorized_columns\": [], \"identifier_columns\": []},\n }\n\n for config in config_list:\n col_name = config.get(\"column_name\")\n vectorize = config.get(\"vectorize\") == \"True\" or config.get(\"vectorize\") is True\n identifier = config.get(\"identifier\") == \"True\" or config.get(\"identifier\") is True\n\n # Add to columns list\n metadata[\"columns\"].append(\n {\n \"name\": col_name,\n \"vectorize\": vectorize,\n \"identifier\": identifier,\n }\n )\n\n # Update summary\n if vectorize:\n metadata[\"summary\"][\"vectorized_columns\"].append(col_name)\n if identifier:\n metadata[\"summary\"][\"identifier_columns\"].append(col_name)\n\n return metadata\n\n async def _create_vector_store(\n self, df_source: pd.DataFrame, config_list: list[dict[str, Any]], embedding_model: str, api_key: str\n ) -> None:\n \"\"\"Create vector store following Local DB component pattern.\"\"\"\n try:\n # Set up vector store directory\n vector_store_dir = await self._kb_path()\n if not vector_store_dir:\n msg = \"Knowledge base path is not set. Please create a new knowledge base first.\"\n raise ValueError(msg)\n vector_store_dir.mkdir(parents=True, exist_ok=True)\n\n # Create embeddings model\n embedding_function = self._build_embeddings(embedding_model, api_key)\n\n # Convert DataFrame to Data objects (following Local DB pattern)\n data_objects = await self._convert_df_to_data_objects(df_source, config_list)\n\n # Create vector store\n chroma = Chroma(\n persist_directory=str(vector_store_dir),\n embedding_function=embedding_function,\n collection_name=self.knowledge_base,\n )\n\n # Convert Data objects to LangChain Documents\n documents = []\n for data_obj in data_objects:\n doc = data_obj.to_lc_document()\n documents.append(doc)\n\n # Add documents to vector store\n if documents:\n chroma.add_documents(documents)\n self.log(f\"Added {len(documents)} documents to vector store '{self.knowledge_base}'\")\n\n except (OSError, ValueError, RuntimeError) as e:\n self.log(f\"Error creating vector store: {e}\")\n\n async def _convert_df_to_data_objects(\n self, df_source: pd.DataFrame, config_list: list[dict[str, Any]]\n ) -> list[Data]:\n \"\"\"Convert DataFrame to Data objects for vector store.\"\"\"\n data_objects: list[Data] = []\n\n # Set up vector store directory\n kb_path = await self._kb_path()\n\n # If we don't allow duplicates, we need to get the existing hashes\n chroma = Chroma(\n persist_directory=str(kb_path),\n collection_name=self.knowledge_base,\n )\n\n # Get all documents and their metadata\n all_docs = chroma.get()\n\n # Extract all _id values from metadata\n id_list = [metadata.get(\"_id\") for metadata in all_docs[\"metadatas\"] if metadata.get(\"_id\")]\n\n # Get column roles\n content_cols = []\n identifier_cols = []\n\n for config in config_list:\n col_name = config.get(\"column_name\")\n vectorize = config.get(\"vectorize\") == \"True\" or config.get(\"vectorize\") is True\n identifier = config.get(\"identifier\") == \"True\" or config.get(\"identifier\") is True\n\n if vectorize:\n content_cols.append(col_name)\n elif identifier:\n identifier_cols.append(col_name)\n\n # Convert each row to a Data object\n for _, row in df_source.iterrows():\n # Build content text from identifier columns using list comprehension\n identifier_parts = [str(row[col]) for col in content_cols if col in row and pd.notna(row[col])]\n\n # Join all parts into a single string\n page_content = \" \".join(identifier_parts)\n\n # Build metadata from NON-vectorized columns only (simple key-value pairs)\n data_dict = {\n \"text\": page_content, # Main content for vectorization\n }\n\n # Add identifier columns if they exist\n if identifier_cols:\n identifier_parts = [str(row[col]) for col in identifier_cols if col in row and pd.notna(row[col])]\n page_content = \" \".join(identifier_parts)\n\n # Add metadata columns as simple key-value pairs\n for col in df_source.columns:\n if col not in content_cols and col in row and pd.notna(row[col]):\n # Convert to simple types for Chroma metadata\n value = row[col]\n data_dict[col] = str(value) # Convert complex types to string\n\n # Hash the page_content for unique ID\n page_content_hash = hashlib.sha256(page_content.encode()).hexdigest()\n data_dict[\"_id\"] = page_content_hash\n\n # If duplicates are disallowed, and hash exists, prevent adding this row\n if not self.allow_duplicates and page_content_hash in id_list:\n self.log(f\"Skipping duplicate row with hash {page_content_hash}\")\n continue\n\n # Create Data object - everything except \"text\" becomes metadata\n data_obj = Data(data=data_dict)\n data_objects.append(data_obj)\n\n return data_objects\n\n def is_valid_collection_name(self, name, min_length: int = 3, max_length: int = 63) -> bool:\n \"\"\"Validates collection name against conditions 1-3.\n\n 1. Contains 3-63 characters\n 2. Starts and ends with alphanumeric character\n 3. Contains only alphanumeric characters, underscores, or hyphens.\n\n Args:\n name (str): Collection name to validate\n min_length (int): Minimum length of the name\n max_length (int): Maximum length of the name\n\n Returns:\n bool: True if valid, False otherwise\n \"\"\"\n # Check length (condition 1)\n if not (min_length <= len(name) <= max_length):\n return False\n\n # Check start/end with alphanumeric (condition 2)\n if not (name[0].isalnum() and name[-1].isalnum()):\n return False\n\n # Check allowed characters (condition 3)\n return re.match(r\"^[a-zA-Z0-9_-]+$\", name) is not None\n\n async def _kb_path(self) -> Path | None:\n # Check if we already have the path cached\n cached_path = getattr(self, \"_cached_kb_path\", None)\n if cached_path is not None:\n return cached_path\n\n # If not cached, compute it\n async with session_scope() as db:\n if not self.user_id:\n msg = \"User ID is required for fetching knowledge base path.\"\n raise ValueError(msg)\n current_user = await get_user_by_id(db, self.user_id)\n if not current_user:\n msg = f\"User with ID {self.user_id} not found.\"\n raise ValueError(msg)\n kb_user = current_user.username\n\n kb_root = self._get_kb_root()\n\n # Cache the result\n self._cached_kb_path = kb_root / kb_user / self.knowledge_base\n\n return self._cached_kb_path\n\n # ---------------------------------------------------------------------\n # OUTPUT METHODS\n # ---------------------------------------------------------------------\n async def build_kb_info(self) -> Data:\n \"\"\"Main ingestion routine → returns a dict with KB metadata.\"\"\"\n try:\n # Get source DataFrame\n df_source: pd.DataFrame = self.input_df\n\n # Validate column configuration (using Structured Output patterns)\n config_list = self._validate_column_config(df_source)\n column_metadata = self._build_column_metadata(config_list, df_source)\n\n # Read the embedding info from the knowledge base folder\n kb_path = await self._kb_path()\n if not kb_path:\n msg = \"Knowledge base path is not set. Please create a new knowledge base first.\"\n raise ValueError(msg)\n metadata_path = kb_path / \"embedding_metadata.json\"\n\n # If the API key is not provided, try to read it from the metadata file\n if metadata_path.exists():\n settings_service = get_settings_service()\n metadata = json.loads(metadata_path.read_text())\n embedding_model = metadata.get(\"embedding_model\")\n try:\n api_key = decrypt_api_key(metadata[\"api_key\"], settings_service)\n except (InvalidToken, TypeError, ValueError) as e:\n logger.error(f\"Could not decrypt API key. Please provide it manually. Error: {e}\")\n\n # Check if a custom API key was provided, update metadata if so\n if self.api_key:\n api_key = self.api_key\n self._save_embedding_metadata(\n kb_path=kb_path,\n embedding_model=embedding_model,\n api_key=api_key,\n )\n\n # Create vector store following Local DB component pattern\n await self._create_vector_store(df_source, config_list, embedding_model=embedding_model, api_key=api_key)\n\n # Save KB files (using File Component storage patterns)\n self._save_kb_files(kb_path, config_list)\n\n # Build metadata response\n meta: dict[str, Any] = {\n \"kb_id\": str(uuid.uuid4()),\n \"kb_name\": self.knowledge_base,\n \"rows\": len(df_source),\n \"column_metadata\": column_metadata,\n \"path\": str(kb_path),\n \"config_columns\": len(config_list),\n \"timestamp\": datetime.now(tz=timezone.utc).isoformat(),\n }\n\n # Set status message\n self.status = f\"✅ KB **{self.knowledge_base}** saved · {len(df_source)} chunks.\"\n\n return Data(data=meta)\n\n except (OSError, ValueError, RuntimeError, KeyError) as e:\n self.log(f\"Error in KB ingestion: {e}\")\n self.status = f\"❌ KB ingestion failed: {e}\"\n return Data(data={\"error\": str(e), \"kb_name\": self.knowledge_base})\n\n async def _get_api_key_variable(self, field_value: dict[str, Any]):\n async with session_scope() as db:\n if not self.user_id:\n msg = \"User ID is required for fetching global variables.\"\n raise ValueError(msg)\n current_user = await get_user_by_id(db, self.user_id)\n if not current_user:\n msg = f\"User with ID {self.user_id} not found.\"\n raise ValueError(msg)\n variable_service = get_variable_service()\n\n # Process the api_key field variable\n return await variable_service.get_variable(\n user_id=current_user.id,\n name=field_value[\"03_api_key\"],\n field=\"\",\n session=db,\n )\n\n async def update_build_config(\n self,\n build_config: dotdict,\n field_value: Any,\n field_name: str | None = None,\n ) -> dotdict:\n \"\"\"Update build configuration based on provider selection.\"\"\"\n # Create a new knowledge base\n if field_name == \"knowledge_base\":\n async with session_scope() as db:\n if not self.user_id:\n msg = \"User ID is required for fetching knowledge base list.\"\n raise ValueError(msg)\n current_user = await get_user_by_id(db, self.user_id)\n if not current_user:\n msg = f\"User with ID {self.user_id} not found.\"\n raise ValueError(msg)\n kb_user = current_user.username\n if isinstance(field_value, dict) and \"01_new_kb_name\" in field_value:\n # Validate the knowledge base name - Make sure it follows these rules:\n if not self.is_valid_collection_name(field_value[\"01_new_kb_name\"]):\n msg = f\"Invalid knowledge base name: {field_value['01_new_kb_name']}\"\n raise ValueError(msg)\n\n api_key = field_value.get(\"03_api_key\", None)\n with contextlib.suppress(Exception):\n # If the API key is a variable, resolve it\n api_key = await self._get_api_key_variable(field_value)\n\n # Make sure api_key is a string\n if not isinstance(api_key, str):\n msg = \"API key must be a string.\"\n raise ValueError(msg)\n\n # We need to test the API Key one time against the embedding model\n embed_model = self._build_embeddings(embedding_model=field_value[\"02_embedding_model\"], api_key=api_key)\n\n # Try to generate a dummy embedding to validate the API key without blocking the event loop\n try:\n await asyncio.wait_for(\n asyncio.to_thread(embed_model.embed_query, \"test\"),\n timeout=10,\n )\n except TimeoutError as e:\n msg = \"Embedding validation timed out. Please verify network connectivity and key.\"\n raise ValueError(msg) from e\n except Exception as e:\n msg = f\"Embedding validation failed: {e!s}\"\n raise ValueError(msg) from e\n\n # Create the new knowledge base directory\n kb_path = KNOWLEDGE_BASES_ROOT_PATH / kb_user / field_value[\"01_new_kb_name\"]\n kb_path.mkdir(parents=True, exist_ok=True)\n\n # Save the embedding metadata\n build_config[\"knowledge_base\"][\"value\"] = field_value[\"01_new_kb_name\"]\n self._save_embedding_metadata(\n kb_path=kb_path,\n embedding_model=field_value[\"02_embedding_model\"],\n api_key=api_key,\n )\n\n # Update the knowledge base options dynamically\n build_config[\"knowledge_base\"][\"options\"] = await get_knowledge_bases(\n KNOWLEDGE_BASES_ROOT_PATH,\n user_id=self.user_id,\n )\n\n # If the selected knowledge base is not available, reset it\n if build_config[\"knowledge_base\"][\"value\"] not in build_config[\"knowledge_base\"][\"options\"]:\n build_config[\"knowledge_base\"][\"value\"] = None\n\n return build_config\n" + "value": "from __future__ import annotations\n\nimport asyncio\nimport contextlib\nimport hashlib\nimport json\nimport re\nimport uuid\nfrom dataclasses import asdict, dataclass, field\nfrom datetime import datetime, timezone\nfrom pathlib import Path\nfrom typing import TYPE_CHECKING, Any\n\nimport pandas as pd\nfrom cryptography.fernet import InvalidToken\nfrom langchain_chroma import Chroma\nfrom loguru import logger\n\nfrom langflow.base.knowledge_bases.knowledge_base_utils import get_knowledge_bases\nfrom langflow.base.models.openai_constants import OPENAI_EMBEDDING_MODEL_NAMES\nfrom langflow.components.processing.converter import convert_to_dataframe\nfrom langflow.custom import Component\nfrom langflow.io import BoolInput, DropdownInput, HandleInput, IntInput, Output, SecretStrInput, StrInput, TableInput\nfrom langflow.schema.data import Data\nfrom langflow.schema.dotdict import dotdict # noqa: TC001\nfrom langflow.schema.table import EditMode\nfrom langflow.services.auth.utils import decrypt_api_key, encrypt_api_key\nfrom langflow.services.database.models.user.crud import get_user_by_id\nfrom langflow.services.deps import get_settings_service, get_variable_service, session_scope\n\nif TYPE_CHECKING:\n from langflow.schema.dataframe import DataFrame\n\nHUGGINGFACE_MODEL_NAMES = [\"sentence-transformers/all-MiniLM-L6-v2\", \"sentence-transformers/all-mpnet-base-v2\"]\nCOHERE_MODEL_NAMES = [\"embed-english-v3.0\", \"embed-multilingual-v3.0\"]\n\nsettings = get_settings_service().settings\nknowledge_directory = settings.knowledge_bases_dir\nif not knowledge_directory:\n msg = \"Knowledge bases directory is not set in the settings.\"\n raise ValueError(msg)\nKNOWLEDGE_BASES_ROOT_PATH = Path(knowledge_directory).expanduser()\n\n\nclass KnowledgeIngestionComponent(Component):\n \"\"\"Create or append to Langflow Knowledge from a DataFrame.\"\"\"\n\n # ------ UI metadata ---------------------------------------------------\n display_name = \"Knowledge Ingestion\"\n description = \"Create or update knowledge in Langflow.\"\n icon = \"upload\"\n name = \"KnowledgeIngestion\"\n\n def __init__(self, *args, **kwargs) -> None:\n super().__init__(*args, **kwargs)\n self._cached_kb_path: Path | None = None\n\n @dataclass\n class NewKnowledgeBaseInput:\n functionality: str = \"create\"\n fields: dict[str, dict] = field(\n default_factory=lambda: {\n \"data\": {\n \"node\": {\n \"name\": \"create_knowledge_base\",\n \"description\": \"Create new knowledge in Langflow.\",\n \"display_name\": \"Create new knowledge\",\n \"field_order\": [\"01_new_kb_name\", \"02_embedding_model\", \"03_api_key\"],\n \"template\": {\n \"01_new_kb_name\": StrInput(\n name=\"new_kb_name\",\n display_name=\"Knowledge Name\",\n info=\"Name of the new knowledge to create.\",\n required=True,\n ),\n \"02_embedding_model\": DropdownInput(\n name=\"embedding_model\",\n display_name=\"Model Name\",\n info=\"Select the embedding model to use for this knowledge base.\",\n required=True,\n options=OPENAI_EMBEDDING_MODEL_NAMES + HUGGINGFACE_MODEL_NAMES + COHERE_MODEL_NAMES,\n options_metadata=[{\"icon\": \"OpenAI\"} for _ in OPENAI_EMBEDDING_MODEL_NAMES]\n + [{\"icon\": \"HuggingFace\"} for _ in HUGGINGFACE_MODEL_NAMES]\n + [{\"icon\": \"Cohere\"} for _ in COHERE_MODEL_NAMES],\n ),\n \"03_api_key\": SecretStrInput(\n name=\"api_key\",\n display_name=\"API Key\",\n info=\"Provider API key for embedding model\",\n required=True,\n load_from_db=False,\n ),\n },\n },\n }\n }\n )\n\n # ------ Inputs --------------------------------------------------------\n inputs = [\n DropdownInput(\n name=\"knowledge_base\",\n display_name=\"Knowledge\",\n info=\"Select the knowledge to load data from.\",\n required=True,\n options=[],\n refresh_button=True,\n real_time_refresh=True,\n dialog_inputs=asdict(NewKnowledgeBaseInput()),\n ),\n HandleInput(\n name=\"input_df\",\n display_name=\"Input\",\n info=(\n \"Table with all original columns (already chunked / processed). \"\n \"Accepts Data or DataFrame. If Data is provided, it is converted to a DataFrame automatically.\"\n ),\n input_types=[\"Data\", \"DataFrame\"],\n required=True,\n ),\n TableInput(\n name=\"column_config\",\n display_name=\"Column Configuration\",\n info=\"Configure column behavior for the knowledge base.\",\n required=True,\n table_schema=[\n {\n \"name\": \"column_name\",\n \"display_name\": \"Column Name\",\n \"type\": \"str\",\n \"description\": \"Name of the column in the source DataFrame\",\n \"edit_mode\": EditMode.INLINE,\n },\n {\n \"name\": \"vectorize\",\n \"display_name\": \"Vectorize\",\n \"type\": \"boolean\",\n \"description\": \"Create embeddings for this column\",\n \"default\": False,\n \"edit_mode\": EditMode.INLINE,\n },\n {\n \"name\": \"identifier\",\n \"display_name\": \"Identifier\",\n \"type\": \"boolean\",\n \"description\": \"Use this column as unique identifier\",\n \"default\": False,\n \"edit_mode\": EditMode.INLINE,\n },\n ],\n value=[\n {\n \"column_name\": \"text\",\n \"vectorize\": True,\n \"identifier\": True,\n },\n ],\n ),\n IntInput(\n name=\"chunk_size\",\n display_name=\"Chunk Size\",\n info=\"Batch size for processing embeddings\",\n advanced=True,\n value=1000,\n ),\n SecretStrInput(\n name=\"api_key\",\n display_name=\"Embedding Provider API Key\",\n info=\"API key for the embedding provider to generate embeddings.\",\n advanced=True,\n required=False,\n ),\n BoolInput(\n name=\"allow_duplicates\",\n display_name=\"Allow Duplicates\",\n info=\"Allow duplicate rows in the knowledge base\",\n advanced=True,\n value=False,\n ),\n ]\n\n # ------ Outputs -------------------------------------------------------\n outputs = [Output(display_name=\"Results\", name=\"dataframe_output\", method=\"build_kb_info\")]\n\n # ------ Internal helpers ---------------------------------------------\n def _get_kb_root(self) -> Path:\n \"\"\"Return the root directory for knowledge bases.\"\"\"\n return KNOWLEDGE_BASES_ROOT_PATH\n\n def _validate_column_config(self, df_source: pd.DataFrame) -> list[dict[str, Any]]:\n \"\"\"Validate column configuration using Structured Output patterns.\"\"\"\n if not self.column_config:\n msg = \"Column configuration cannot be empty\"\n raise ValueError(msg)\n\n # Convert table input to list of dicts (similar to Structured Output)\n config_list = self.column_config if isinstance(self.column_config, list) else []\n\n # Validate column names exist in DataFrame\n df_columns = set(df_source.columns)\n for config in config_list:\n col_name = config.get(\"column_name\")\n if col_name not in df_columns:\n msg = f\"Column '{col_name}' not found in DataFrame. Available columns: {sorted(df_columns)}\"\n raise ValueError(msg)\n\n return config_list\n\n def _get_embedding_provider(self, embedding_model: str) -> str:\n \"\"\"Get embedding provider by matching model name to lists.\"\"\"\n if embedding_model in OPENAI_EMBEDDING_MODEL_NAMES:\n return \"OpenAI\"\n if embedding_model in HUGGINGFACE_MODEL_NAMES:\n return \"HuggingFace\"\n if embedding_model in COHERE_MODEL_NAMES:\n return \"Cohere\"\n return \"Custom\"\n\n def _build_embeddings(self, embedding_model: str, api_key: str):\n \"\"\"Build embedding model using provider patterns.\"\"\"\n # Get provider by matching model name to lists\n provider = self._get_embedding_provider(embedding_model)\n\n # Validate provider and model\n if provider == \"OpenAI\":\n from langchain_openai import OpenAIEmbeddings\n\n if not api_key:\n msg = \"OpenAI API key is required when using OpenAI provider\"\n raise ValueError(msg)\n return OpenAIEmbeddings(\n model=embedding_model,\n api_key=api_key,\n chunk_size=self.chunk_size,\n )\n if provider == \"HuggingFace\":\n from langchain_huggingface import HuggingFaceEmbeddings\n\n return HuggingFaceEmbeddings(\n model=embedding_model,\n )\n if provider == \"Cohere\":\n from langchain_cohere import CohereEmbeddings\n\n if not api_key:\n msg = \"Cohere API key is required when using Cohere provider\"\n raise ValueError(msg)\n return CohereEmbeddings(\n model=embedding_model,\n cohere_api_key=api_key,\n )\n if provider == \"Custom\":\n # For custom embedding models, we would need additional configuration\n msg = \"Custom embedding models not yet supported\"\n raise NotImplementedError(msg)\n msg = f\"Unknown provider: {provider}\"\n raise ValueError(msg)\n\n def _build_embedding_metadata(self, embedding_model, api_key) -> dict[str, Any]:\n \"\"\"Build embedding model metadata.\"\"\"\n # Get provider by matching model name to lists\n embedding_provider = self._get_embedding_provider(embedding_model)\n\n api_key_to_save = None\n if api_key and hasattr(api_key, \"get_secret_value\"):\n api_key_to_save = api_key.get_secret_value()\n elif isinstance(api_key, str):\n api_key_to_save = api_key\n\n encrypted_api_key = None\n if api_key_to_save:\n settings_service = get_settings_service()\n try:\n encrypted_api_key = encrypt_api_key(api_key_to_save, settings_service=settings_service)\n except (TypeError, ValueError) as e:\n self.log(f\"Could not encrypt API key: {e}\")\n logger.error(f\"Could not encrypt API key: {e}\")\n\n return {\n \"embedding_provider\": embedding_provider,\n \"embedding_model\": embedding_model,\n \"api_key\": encrypted_api_key,\n \"api_key_used\": bool(api_key),\n \"chunk_size\": self.chunk_size,\n \"created_at\": datetime.now(timezone.utc).isoformat(),\n }\n\n def _save_embedding_metadata(self, kb_path: Path, embedding_model: str, api_key: str) -> None:\n \"\"\"Save embedding model metadata.\"\"\"\n embedding_metadata = self._build_embedding_metadata(embedding_model, api_key)\n metadata_path = kb_path / \"embedding_metadata.json\"\n metadata_path.write_text(json.dumps(embedding_metadata, indent=2))\n\n def _save_kb_files(\n self,\n kb_path: Path,\n config_list: list[dict[str, Any]],\n ) -> None:\n \"\"\"Save KB files using File Component storage patterns.\"\"\"\n try:\n # Create directory (following File Component patterns)\n kb_path.mkdir(parents=True, exist_ok=True)\n\n # Save column configuration\n # Only do this if the file doesn't exist already\n cfg_path = kb_path / \"schema.json\"\n if not cfg_path.exists():\n cfg_path.write_text(json.dumps(config_list, indent=2))\n\n except (OSError, TypeError, ValueError) as e:\n self.log(f\"Error saving KB files: {e}\")\n\n def _build_column_metadata(self, config_list: list[dict[str, Any]], df_source: pd.DataFrame) -> dict[str, Any]:\n \"\"\"Build detailed column metadata.\"\"\"\n metadata: dict[str, Any] = {\n \"total_columns\": len(df_source.columns),\n \"mapped_columns\": len(config_list),\n \"unmapped_columns\": len(df_source.columns) - len(config_list),\n \"columns\": [],\n \"summary\": {\"vectorized_columns\": [], \"identifier_columns\": []},\n }\n\n for config in config_list:\n col_name = config.get(\"column_name\")\n vectorize = config.get(\"vectorize\") == \"True\" or config.get(\"vectorize\") is True\n identifier = config.get(\"identifier\") == \"True\" or config.get(\"identifier\") is True\n\n # Add to columns list\n metadata[\"columns\"].append(\n {\n \"name\": col_name,\n \"vectorize\": vectorize,\n \"identifier\": identifier,\n }\n )\n\n # Update summary\n if vectorize:\n metadata[\"summary\"][\"vectorized_columns\"].append(col_name)\n if identifier:\n metadata[\"summary\"][\"identifier_columns\"].append(col_name)\n\n return metadata\n\n async def _create_vector_store(\n self, df_source: pd.DataFrame, config_list: list[dict[str, Any]], embedding_model: str, api_key: str\n ) -> None:\n \"\"\"Create vector store following Local DB component pattern.\"\"\"\n try:\n # Set up vector store directory\n vector_store_dir = await self._kb_path()\n if not vector_store_dir:\n msg = \"Knowledge base path is not set. Please create a new knowledge base first.\"\n raise ValueError(msg)\n vector_store_dir.mkdir(parents=True, exist_ok=True)\n\n # Create embeddings model\n embedding_function = self._build_embeddings(embedding_model, api_key)\n\n # Convert DataFrame to Data objects (following Local DB pattern)\n data_objects = await self._convert_df_to_data_objects(df_source, config_list)\n\n # Create vector store\n chroma = Chroma(\n persist_directory=str(vector_store_dir),\n embedding_function=embedding_function,\n collection_name=self.knowledge_base,\n )\n\n # Convert Data objects to LangChain Documents\n documents = []\n for data_obj in data_objects:\n doc = data_obj.to_lc_document()\n documents.append(doc)\n\n # Add documents to vector store\n if documents:\n chroma.add_documents(documents)\n self.log(f\"Added {len(documents)} documents to vector store '{self.knowledge_base}'\")\n\n except (OSError, ValueError, RuntimeError) as e:\n self.log(f\"Error creating vector store: {e}\")\n\n async def _convert_df_to_data_objects(\n self, df_source: pd.DataFrame, config_list: list[dict[str, Any]]\n ) -> list[Data]:\n \"\"\"Convert DataFrame to Data objects for vector store.\"\"\"\n data_objects: list[Data] = []\n\n # Set up vector store directory\n kb_path = await self._kb_path()\n\n # If we don't allow duplicates, we need to get the existing hashes\n chroma = Chroma(\n persist_directory=str(kb_path),\n collection_name=self.knowledge_base,\n )\n\n # Get all documents and their metadata\n all_docs = chroma.get()\n\n # Extract all _id values from metadata\n id_list = [metadata.get(\"_id\") for metadata in all_docs[\"metadatas\"] if metadata.get(\"_id\")]\n\n # Get column roles\n content_cols = []\n identifier_cols = []\n\n for config in config_list:\n col_name = config.get(\"column_name\")\n vectorize = config.get(\"vectorize\") == \"True\" or config.get(\"vectorize\") is True\n identifier = config.get(\"identifier\") == \"True\" or config.get(\"identifier\") is True\n\n if vectorize:\n content_cols.append(col_name)\n elif identifier:\n identifier_cols.append(col_name)\n\n # Convert each row to a Data object\n for _, row in df_source.iterrows():\n # Build content text from identifier columns using list comprehension\n identifier_parts = [str(row[col]) for col in content_cols if col in row and pd.notna(row[col])]\n\n # Join all parts into a single string\n page_content = \" \".join(identifier_parts)\n\n # Build metadata from NON-vectorized columns only (simple key-value pairs)\n data_dict = {\n \"text\": page_content, # Main content for vectorization\n }\n\n # Add identifier columns if they exist\n if identifier_cols:\n identifier_parts = [str(row[col]) for col in identifier_cols if col in row and pd.notna(row[col])]\n page_content = \" \".join(identifier_parts)\n\n # Add metadata columns as simple key-value pairs\n for col in df_source.columns:\n if col not in content_cols and col in row and pd.notna(row[col]):\n # Convert to simple types for Chroma metadata\n value = row[col]\n data_dict[col] = str(value) # Convert complex types to string\n\n # Hash the page_content for unique ID\n page_content_hash = hashlib.sha256(page_content.encode()).hexdigest()\n data_dict[\"_id\"] = page_content_hash\n\n # If duplicates are disallowed, and hash exists, prevent adding this row\n if not self.allow_duplicates and page_content_hash in id_list:\n self.log(f\"Skipping duplicate row with hash {page_content_hash}\")\n continue\n\n # Create Data object - everything except \"text\" becomes metadata\n data_obj = Data(data=data_dict)\n data_objects.append(data_obj)\n\n return data_objects\n\n def is_valid_collection_name(self, name, min_length: int = 3, max_length: int = 63) -> bool:\n \"\"\"Validates collection name against conditions 1-3.\n\n 1. Contains 3-63 characters\n 2. Starts and ends with alphanumeric character\n 3. Contains only alphanumeric characters, underscores, or hyphens.\n\n Args:\n name (str): Collection name to validate\n min_length (int): Minimum length of the name\n max_length (int): Maximum length of the name\n\n Returns:\n bool: True if valid, False otherwise\n \"\"\"\n # Check length (condition 1)\n if not (min_length <= len(name) <= max_length):\n return False\n\n # Check start/end with alphanumeric (condition 2)\n if not (name[0].isalnum() and name[-1].isalnum()):\n return False\n\n # Check allowed characters (condition 3)\n return re.match(r\"^[a-zA-Z0-9_-]+$\", name) is not None\n\n async def _kb_path(self) -> Path | None:\n # Check if we already have the path cached\n cached_path = getattr(self, \"_cached_kb_path\", None)\n if cached_path is not None:\n return cached_path\n\n # If not cached, compute it\n async with session_scope() as db:\n if not self.user_id:\n msg = \"User ID is required for fetching knowledge base path.\"\n raise ValueError(msg)\n current_user = await get_user_by_id(db, self.user_id)\n if not current_user:\n msg = f\"User with ID {self.user_id} not found.\"\n raise ValueError(msg)\n kb_user = current_user.username\n\n kb_root = self._get_kb_root()\n\n # Cache the result\n self._cached_kb_path = kb_root / kb_user / self.knowledge_base\n\n return self._cached_kb_path\n\n # ---------------------------------------------------------------------\n # OUTPUT METHODS\n # ---------------------------------------------------------------------\n async def build_kb_info(self) -> Data:\n \"\"\"Main ingestion routine → returns a dict with KB metadata.\"\"\"\n try:\n input_value = self.input_df[0] if isinstance(self.input_df, list) else self.input_df\n df_source: DataFrame = convert_to_dataframe(input_value)\n\n # Validate column configuration (using Structured Output patterns)\n config_list = self._validate_column_config(df_source)\n column_metadata = self._build_column_metadata(config_list, df_source)\n\n # Read the embedding info from the knowledge base folder\n kb_path = await self._kb_path()\n if not kb_path:\n msg = \"Knowledge base path is not set. Please create a new knowledge base first.\"\n raise ValueError(msg)\n metadata_path = kb_path / \"embedding_metadata.json\"\n\n # If the API key is not provided, try to read it from the metadata file\n if metadata_path.exists():\n settings_service = get_settings_service()\n metadata = json.loads(metadata_path.read_text())\n embedding_model = metadata.get(\"embedding_model\")\n try:\n api_key = decrypt_api_key(metadata[\"api_key\"], settings_service)\n except (InvalidToken, TypeError, ValueError) as e:\n logger.error(f\"Could not decrypt API key. Please provide it manually. Error: {e}\")\n\n # Check if a custom API key was provided, update metadata if so\n if self.api_key:\n api_key = self.api_key\n self._save_embedding_metadata(\n kb_path=kb_path,\n embedding_model=embedding_model,\n api_key=api_key,\n )\n\n # Create vector store following Local DB component pattern\n await self._create_vector_store(df_source, config_list, embedding_model=embedding_model, api_key=api_key)\n\n # Save KB files (using File Component storage patterns)\n self._save_kb_files(kb_path, config_list)\n\n # Build metadata response\n meta: dict[str, Any] = {\n \"kb_id\": str(uuid.uuid4()),\n \"kb_name\": self.knowledge_base,\n \"rows\": len(df_source),\n \"column_metadata\": column_metadata,\n \"path\": str(kb_path),\n \"config_columns\": len(config_list),\n \"timestamp\": datetime.now(tz=timezone.utc).isoformat(),\n }\n\n # Set status message\n self.status = f\"✅ KB **{self.knowledge_base}** saved · {len(df_source)} chunks.\"\n\n return Data(data=meta)\n\n except (OSError, ValueError, RuntimeError, KeyError) as e:\n msg = f\"Error during KB ingestion: {e}\"\n raise RuntimeError(msg) from e\n\n async def _get_api_key_variable(self, field_value: dict[str, Any]):\n async with session_scope() as db:\n if not self.user_id:\n msg = \"User ID is required for fetching global variables.\"\n raise ValueError(msg)\n current_user = await get_user_by_id(db, self.user_id)\n if not current_user:\n msg = f\"User with ID {self.user_id} not found.\"\n raise ValueError(msg)\n variable_service = get_variable_service()\n\n # Process the api_key field variable\n return await variable_service.get_variable(\n user_id=current_user.id,\n name=field_value[\"03_api_key\"],\n field=\"\",\n session=db,\n )\n\n async def update_build_config(\n self,\n build_config: dotdict,\n field_value: Any,\n field_name: str | None = None,\n ) -> dotdict:\n \"\"\"Update build configuration based on provider selection.\"\"\"\n # Create a new knowledge base\n if field_name == \"knowledge_base\":\n async with session_scope() as db:\n if not self.user_id:\n msg = \"User ID is required for fetching knowledge base list.\"\n raise ValueError(msg)\n current_user = await get_user_by_id(db, self.user_id)\n if not current_user:\n msg = f\"User with ID {self.user_id} not found.\"\n raise ValueError(msg)\n kb_user = current_user.username\n if isinstance(field_value, dict) and \"01_new_kb_name\" in field_value:\n # Validate the knowledge base name - Make sure it follows these rules:\n if not self.is_valid_collection_name(field_value[\"01_new_kb_name\"]):\n msg = f\"Invalid knowledge base name: {field_value['01_new_kb_name']}\"\n raise ValueError(msg)\n\n api_key = field_value.get(\"03_api_key\", None)\n with contextlib.suppress(Exception):\n # If the API key is a variable, resolve it\n api_key = await self._get_api_key_variable(field_value)\n\n # Make sure api_key is a string\n if not isinstance(api_key, str):\n msg = \"API key must be a string.\"\n raise ValueError(msg)\n\n # We need to test the API Key one time against the embedding model\n embed_model = self._build_embeddings(embedding_model=field_value[\"02_embedding_model\"], api_key=api_key)\n\n # Try to generate a dummy embedding to validate the API key without blocking the event loop\n try:\n await asyncio.wait_for(\n asyncio.to_thread(embed_model.embed_query, \"test\"),\n timeout=10,\n )\n except TimeoutError as e:\n msg = \"Embedding validation timed out. Please verify network connectivity and key.\"\n raise ValueError(msg) from e\n except Exception as e:\n msg = f\"Embedding validation failed: {e!s}\"\n raise ValueError(msg) from e\n\n # Create the new knowledge base directory\n kb_path = KNOWLEDGE_BASES_ROOT_PATH / kb_user / field_value[\"01_new_kb_name\"]\n kb_path.mkdir(parents=True, exist_ok=True)\n\n # Save the embedding metadata\n build_config[\"knowledge_base\"][\"value\"] = field_value[\"01_new_kb_name\"]\n self._save_embedding_metadata(\n kb_path=kb_path,\n embedding_model=field_value[\"02_embedding_model\"],\n api_key=api_key,\n )\n\n # Update the knowledge base options dynamically\n build_config[\"knowledge_base\"][\"options\"] = await get_knowledge_bases(\n KNOWLEDGE_BASES_ROOT_PATH,\n user_id=self.user_id,\n )\n\n # If the selected knowledge base is not available, reset it\n if build_config[\"knowledge_base\"][\"value\"] not in build_config[\"knowledge_base\"][\"options\"]:\n build_config[\"knowledge_base\"][\"value\"] = None\n\n return build_config\n" }, "column_config": { "_input_type": "TableInput", @@ -933,18 +932,19 @@ "value": [ { "column_name": "text", - "identifier": false, + "identifier": true, "vectorize": true } ] }, "input_df": { - "_input_type": "DataFrameInput", + "_input_type": "HandleInput", "advanced": false, - "display_name": "Data", + "display_name": "Input", "dynamic": false, - "info": "Table with all original columns (already chunked / processed).", + "info": "Table with all original columns (already chunked / processed). Accepts Data or DataFrame. If Data is provided, it is converted to a DataFrame automatically.", "input_types": [ + "Data", "DataFrame" ], "list": false, @@ -954,8 +954,6 @@ "required": true, "show": true, "title_case": false, - "tool_mode": false, - "trace_as_input": true, "trace_as_metadata": true, "type": "other", "value": "" @@ -1054,7 +1052,7 @@ "dynamic": false, "info": "Provider API key for embedding model", "input_types": [], - "load_from_db": true, + "load_from_db": false, "name": "api_key", "password": true, "placeholder": "", @@ -1077,6 +1075,7 @@ "options": [], "options_metadata": [], "placeholder": "", + "real_time_refresh": true, "refresh_button": true, "required": true, "show": true, @@ -1091,33 +1090,33 @@ "tool_mode": false }, "showNode": true, - "type": "KBIngestion" + "type": "KnowledgeIngestion" }, "dragging": false, - "id": "KBIngestion-jj5iW", + "id": "KnowledgeIngestion-uSOy6", "measured": { "height": 333, "width": 320 }, "position": { - "x": 1000.4023842644599, - "y": 101.77068666606948 + "x": 1001.4259863865477, + "y": 110.75185048793182 }, "selected": false, "type": "genericNode" } ], "viewport": { - "x": 280.03407172860966, - "y": 131.39479654897661, - "zoom": 0.9295918751284687 + "x": 283.8838616133703, + "y": 160.13925386069144, + "zoom": 0.937778878472336 } }, "description": "An example of creating a Knowledge Base and ingesting data into it from a web URL.", "endpoint_name": null, - "id": "dfffa40b-547b-46ae-9c4a-6539851990bf", + "id": "fd03902d-38ec-4bc5-ac00-c308bbcca359", "is_component": false, - "last_tested_version": "1.5.0.post1", + "last_tested_version": "1.5.0.post2", "name": "Knowledge Ingestion", "tags": [] } \ No newline at end of file diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Knowledge Retrieval.json b/src/backend/base/langflow/initial_setup/starter_projects/Knowledge Retrieval.json index afe778eef487..97d40d442341 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Knowledge Retrieval.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Knowledge Retrieval.json @@ -2,66 +2,64 @@ "data": { "edges": [ { - "animated": false, + "className": "", "data": { "sourceHandle": { - "dataType": "TextInput", - "id": "TextInput-WyJxO", - "name": "text", + "dataType": "KnowledgeRetrieval", + "id": "KnowledgeRetrieval-kgwih", + "name": "retrieve_data", "output_types": [ - "Message" + "DataFrame" ] }, "targetHandle": { - "fieldName": "search_query", - "id": "KBRetrieval-zz3I0", + "fieldName": "input_value", + "id": "ChatOutput-OG4M9", "inputTypes": [ + "Data", + "DataFrame", "Message" ], - "type": "str" + "type": "other" } }, - "id": "xy-edge__TextInput-WyJxO{œdataTypeœ:œTextInputœ,œidœ:œTextInput-WyJxOœ,œnameœ:œtextœ,œoutput_typesœ:[œMessageœ]}-KBRetrieval-zz3I0{œfieldNameœ:œsearch_queryœ,œidœ:œKBRetrieval-zz3I0œ,œinputTypesœ:[œMessageœ],œtypeœ:œstrœ}", - "selected": false, - "source": "TextInput-WyJxO", - "sourceHandle": "{œdataTypeœ: œTextInputœ, œidœ: œTextInput-WyJxOœ, œnameœ: œtextœ, œoutput_typesœ: [œMessageœ]}", - "target": "KBRetrieval-zz3I0", - "targetHandle": "{œfieldNameœ: œsearch_queryœ, œidœ: œKBRetrieval-zz3I0œ, œinputTypesœ: [œMessageœ], œtypeœ: œstrœ}" + "id": "xy-edge__KnowledgeRetrieval-kgwih{œdataTypeœ:œKnowledgeRetrievalœ,œidœ:œKnowledgeRetrieval-kgwihœ,œnameœ:œretrieve_dataœ,œoutput_typesœ:[œDataFrameœ]}-ChatOutput-OG4M9{œfieldNameœ:œinput_valueœ,œidœ:œChatOutput-OG4M9œ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}", + "source": "KnowledgeRetrieval-kgwih", + "sourceHandle": "{œdataTypeœ: œKnowledgeRetrievalœ, œidœ: œKnowledgeRetrieval-kgwihœ, œnameœ: œretrieve_dataœ, œoutput_typesœ: [œDataFrameœ]}", + "target": "ChatOutput-OG4M9", + "targetHandle": "{œfieldNameœ: œinput_valueœ, œidœ: œChatOutput-OG4M9œ, œinputTypesœ: [œDataœ, œDataFrameœ, œMessageœ], œtypeœ: œotherœ}" }, { - "animated": false, + "className": "", "data": { "sourceHandle": { - "dataType": "KBRetrieval", - "id": "KBRetrieval-zz3I0", - "name": "chroma_kb_data", + "dataType": "TextInput", + "id": "TextInput-k48NL", + "name": "text", "output_types": [ - "DataFrame" + "Message" ] }, "targetHandle": { - "fieldName": "input_value", - "id": "ChatOutput-N7nxz", + "fieldName": "search_query", + "id": "KnowledgeRetrieval-kgwih", "inputTypes": [ - "Data", - "DataFrame", "Message" ], - "type": "other" + "type": "str" } }, - "id": "xy-edge__KBRetrieval-zz3I0{œdataTypeœ:œKBRetrievalœ,œidœ:œKBRetrieval-zz3I0œ,œnameœ:œchroma_kb_dataœ,œoutput_typesœ:[œDataFrameœ]}-ChatOutput-N7nxz{œfieldNameœ:œinput_valueœ,œidœ:œChatOutput-N7nxzœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}", - "selected": false, - "source": "KBRetrieval-zz3I0", - "sourceHandle": "{œdataTypeœ: œKBRetrievalœ, œidœ: œKBRetrieval-zz3I0œ, œnameœ: œchroma_kb_dataœ, œoutput_typesœ: [œDataFrameœ]}", - "target": "ChatOutput-N7nxz", - "targetHandle": "{œfieldNameœ: œinput_valueœ, œidœ: œChatOutput-N7nxzœ, œinputTypesœ: [œDataœ, œDataFrameœ, œMessageœ], œtypeœ: œotherœ}" + "id": "xy-edge__TextInput-k48NL{œdataTypeœ:œTextInputœ,œidœ:œTextInput-k48NLœ,œnameœ:œtextœ,œoutput_typesœ:[œMessageœ]}-KnowledgeRetrieval-kgwih{œfieldNameœ:œsearch_queryœ,œidœ:œKnowledgeRetrieval-kgwihœ,œinputTypesœ:[œMessageœ],œtypeœ:œstrœ}", + "source": "TextInput-k48NL", + "sourceHandle": "{œdataTypeœ: œTextInputœ, œidœ: œTextInput-k48NLœ, œnameœ: œtextœ, œoutput_typesœ: [œMessageœ]}", + "target": "KnowledgeRetrieval-kgwih", + "targetHandle": "{œfieldNameœ: œsearch_queryœ, œidœ: œKnowledgeRetrieval-kgwihœ, œinputTypesœ: [œMessageœ], œtypeœ: œstrœ}" } ], "nodes": [ { "data": { - "id": "note-f86G8", + "id": "note-6CE3d", "node": { "description": "## Knowledge Retrieval\n\nA stand-alone component handles the retrieval of ingested knowledge from existing knowledge bases. To retrieve knowledge:\n\n1. Select your knowledge base from the Knowledge Base dropdown. If you do not see it, choose \"Refresh List\".\n2. (Optional) Enter a Search Query to be performed against the knowledge base.\n\nNote that by default, 5 results are returned, which can be configured by clicking Controls at the top of the component.\n", "display_name": "", @@ -72,7 +70,7 @@ }, "dragging": false, "height": 384, - "id": "note-f86G8", + "id": "note-6CE3d", "measured": { "height": 384, "width": 371 @@ -88,7 +86,7 @@ }, { "data": { - "id": "TextInput-WyJxO", + "id": "TextInput-k48NL", "node": { "base_classes": [ "Message" @@ -191,7 +189,7 @@ "type": "TextInput" }, "dragging": false, - "id": "TextInput-WyJxO", + "id": "TextInput-k48NL", "measured": { "height": 204, "width": 320 @@ -205,7 +203,7 @@ }, { "data": { - "id": "ChatOutput-N7nxz", + "id": "ChatOutput-OG4M9", "node": { "base_classes": [ "Message" @@ -520,7 +518,7 @@ "type": "ChatOutput" }, "dragging": false, - "id": "ChatOutput-N7nxz", + "id": "ChatOutput-OG4M9", "measured": { "height": 48, "width": 192 @@ -534,7 +532,7 @@ }, { "data": { - "id": "KBRetrieval-zz3I0", + "id": "KnowledgeRetrieval-kgwih", "node": { "base_classes": [ "DataFrame" @@ -551,14 +549,15 @@ "api_key", "search_query", "top_k", - "include_metadata" + "include_metadata", + "include_embeddings" ], "frozen": false, - "icon": "database", - "last_updated": "2025-08-14T17:19:22.182Z", + "icon": "download", + "last_updated": "2025-08-26T16:19:16.681Z", "legacy": false, "metadata": { - "code_hash": "6fcf86be1aca", + "code_hash": "1eaf43b9b167", "dependencies": { "dependencies": [ { @@ -596,7 +595,7 @@ ], "total_dependencies": 8 }, - "module": "langflow.components.data.kb_retrieval.KBRetrievalComponent" + "module": "langflow.components.knowledge_bases.retrieval.KnowledgeRetrievalComponent" }, "minimized": false, "output_types": [], @@ -606,8 +605,8 @@ "cache": true, "display_name": "Results", "group_outputs": false, - "method": "get_chroma_kb_data", - "name": "chroma_kb_data", + "method": "retrieve_data", + "name": "retrieve_data", "selected": "DataFrame", "tool_mode": true, "types": [ @@ -652,14 +651,32 @@ "show": true, "title_case": false, "type": "code", - "value": "import json\nfrom pathlib import Path\nfrom typing import Any\n\nfrom cryptography.fernet import InvalidToken\nfrom langchain_chroma import Chroma\nfrom loguru import logger\nfrom pydantic import SecretStr\n\nfrom langflow.base.data.kb_utils import get_knowledge_bases\nfrom langflow.custom import Component\nfrom langflow.io import BoolInput, DropdownInput, IntInput, MessageTextInput, Output, SecretStrInput\nfrom langflow.schema.data import Data\nfrom langflow.schema.dataframe import DataFrame\nfrom langflow.services.auth.utils import decrypt_api_key\nfrom langflow.services.database.models.user.crud import get_user_by_id\nfrom langflow.services.deps import get_settings_service, session_scope\n\nsettings = get_settings_service().settings\nknowledge_directory = settings.knowledge_bases_dir\nif not knowledge_directory:\n msg = \"Knowledge bases directory is not set in the settings.\"\n raise ValueError(msg)\nKNOWLEDGE_BASES_ROOT_PATH = Path(knowledge_directory).expanduser()\n\n\nclass KBRetrievalComponent(Component):\n display_name = \"Knowledge Retrieval\"\n description = \"Search and retrieve data from knowledge.\"\n icon = \"database\"\n name = \"KBRetrieval\"\n\n inputs = [\n DropdownInput(\n name=\"knowledge_base\",\n display_name=\"Knowledge\",\n info=\"Select the knowledge to load data from.\",\n required=True,\n options=[],\n refresh_button=True,\n real_time_refresh=True,\n ),\n SecretStrInput(\n name=\"api_key\",\n display_name=\"Embedding Provider API Key\",\n info=\"API key for the embedding provider to generate embeddings.\",\n advanced=True,\n required=False,\n ),\n MessageTextInput(\n name=\"search_query\",\n display_name=\"Search Query\",\n info=\"Optional search query to filter knowledge base data.\",\n ),\n IntInput(\n name=\"top_k\",\n display_name=\"Top K Results\",\n info=\"Number of top results to return from the knowledge base.\",\n value=5,\n advanced=True,\n required=False,\n ),\n BoolInput(\n name=\"include_metadata\",\n display_name=\"Include Metadata\",\n info=\"Whether to include all metadata and embeddings in the output. If false, only content is returned.\",\n value=True,\n advanced=False,\n ),\n ]\n\n outputs = [\n Output(\n name=\"chroma_kb_data\",\n display_name=\"Results\",\n method=\"get_chroma_kb_data\",\n info=\"Returns the data from the selected knowledge base.\",\n ),\n ]\n\n async def update_build_config(self, build_config, field_value, field_name=None): # noqa: ARG002\n if field_name == \"knowledge_base\":\n # Update the knowledge base options dynamically\n build_config[\"knowledge_base\"][\"options\"] = await get_knowledge_bases(\n KNOWLEDGE_BASES_ROOT_PATH,\n user_id=self.user_id, # Use the user_id from the component context\n )\n\n # If the selected knowledge base is not available, reset it\n if build_config[\"knowledge_base\"][\"value\"] not in build_config[\"knowledge_base\"][\"options\"]:\n build_config[\"knowledge_base\"][\"value\"] = None\n\n return build_config\n\n def _get_kb_metadata(self, kb_path: Path) -> dict:\n \"\"\"Load and process knowledge base metadata.\"\"\"\n metadata: dict[str, Any] = {}\n metadata_file = kb_path / \"embedding_metadata.json\"\n if not metadata_file.exists():\n logger.warning(f\"Embedding metadata file not found at {metadata_file}\")\n return metadata\n\n try:\n with metadata_file.open(\"r\", encoding=\"utf-8\") as f:\n metadata = json.load(f)\n except json.JSONDecodeError:\n logger.error(f\"Error decoding JSON from {metadata_file}\")\n return {}\n\n # Decrypt API key if it exists\n if \"api_key\" in metadata and metadata.get(\"api_key\"):\n settings_service = get_settings_service()\n try:\n decrypted_key = decrypt_api_key(metadata[\"api_key\"], settings_service)\n metadata[\"api_key\"] = decrypted_key\n except (InvalidToken, TypeError, ValueError) as e:\n logger.error(f\"Could not decrypt API key. Please provide it manually. Error: {e}\")\n metadata[\"api_key\"] = None\n return metadata\n\n def _build_embeddings(self, metadata: dict):\n \"\"\"Build embedding model from metadata.\"\"\"\n runtime_api_key = self.api_key.get_secret_value() if isinstance(self.api_key, SecretStr) else self.api_key\n provider = metadata.get(\"embedding_provider\")\n model = metadata.get(\"embedding_model\")\n api_key = runtime_api_key or metadata.get(\"api_key\")\n chunk_size = metadata.get(\"chunk_size\")\n\n # Handle various providers\n if provider == \"OpenAI\":\n from langchain_openai import OpenAIEmbeddings\n\n if not api_key:\n msg = \"OpenAI API key is required. Provide it in the component's advanced settings.\"\n raise ValueError(msg)\n return OpenAIEmbeddings(\n model=model,\n api_key=api_key,\n chunk_size=chunk_size,\n )\n if provider == \"HuggingFace\":\n from langchain_huggingface import HuggingFaceEmbeddings\n\n return HuggingFaceEmbeddings(\n model=model,\n )\n if provider == \"Cohere\":\n from langchain_cohere import CohereEmbeddings\n\n if not api_key:\n msg = \"Cohere API key is required when using Cohere provider\"\n raise ValueError(msg)\n return CohereEmbeddings(\n model=model,\n cohere_api_key=api_key,\n )\n if provider == \"Custom\":\n # For custom embedding models, we would need additional configuration\n msg = \"Custom embedding models not yet supported\"\n raise NotImplementedError(msg)\n # Add other providers here if they become supported in ingest\n msg = f\"Embedding provider '{provider}' is not supported for retrieval.\"\n raise NotImplementedError(msg)\n\n async def get_chroma_kb_data(self) -> DataFrame:\n \"\"\"Retrieve data from the selected knowledge base by reading the Chroma collection.\n\n Returns:\n A DataFrame containing the data rows from the knowledge base.\n \"\"\"\n # Get the current user\n async with session_scope() as db:\n if not self.user_id:\n msg = \"User ID is required for fetching Knowledge Base data.\"\n raise ValueError(msg)\n current_user = await get_user_by_id(db, self.user_id)\n if not current_user:\n msg = f\"User with ID {self.user_id} not found.\"\n raise ValueError(msg)\n kb_user = current_user.username\n kb_path = KNOWLEDGE_BASES_ROOT_PATH / kb_user / self.knowledge_base\n\n metadata = self._get_kb_metadata(kb_path)\n if not metadata:\n msg = f\"Metadata not found for knowledge base: {self.knowledge_base}. Ensure it has been indexed.\"\n raise ValueError(msg)\n\n # Build the embedder for the knowledge base\n embedding_function = self._build_embeddings(metadata)\n\n # Load vector store\n chroma = Chroma(\n persist_directory=str(kb_path),\n embedding_function=embedding_function,\n collection_name=self.knowledge_base,\n )\n\n # If a search query is provided, perform a similarity search\n if self.search_query:\n # Use the search query to perform a similarity search\n logger.info(f\"Performing similarity search with query: {self.search_query}\")\n results = chroma.similarity_search_with_score(\n query=self.search_query or \"\",\n k=self.top_k,\n )\n else:\n results = chroma.similarity_search(\n query=self.search_query or \"\",\n k=self.top_k,\n )\n\n # For each result, make it a tuple to match the expected output format\n results = [(doc, 0) for doc in results] # Assign a dummy score of 0\n\n # If metadata is enabled, get embeddings for the results\n id_to_embedding = {}\n if self.include_metadata and results:\n doc_ids = [doc[0].metadata.get(\"_id\") for doc in results if doc[0].metadata.get(\"_id\")]\n\n # Only proceed if we have valid document IDs\n if doc_ids:\n # Access underlying client to get embeddings\n collection = chroma._client.get_collection(name=self.knowledge_base)\n embeddings_result = collection.get(where={\"_id\": {\"$in\": doc_ids}}, include=[\"embeddings\", \"metadatas\"])\n\n # Create a mapping from document ID to embedding\n for i, metadata in enumerate(embeddings_result.get(\"metadatas\", [])):\n if metadata and \"_id\" in metadata:\n id_to_embedding[metadata[\"_id\"]] = embeddings_result[\"embeddings\"][i]\n\n # Build output data based on include_metadata setting\n data_list = []\n for doc in results:\n if self.include_metadata:\n # Include all metadata, embeddings, and content\n kwargs = {\n \"content\": doc[0].page_content,\n **doc[0].metadata,\n }\n if self.search_query:\n kwargs[\"_score\"] = -1 * doc[1]\n kwargs[\"_embeddings\"] = id_to_embedding.get(doc[0].metadata.get(\"_id\"))\n else:\n # Only include content\n kwargs = {\n \"content\": doc[0].page_content,\n }\n\n data_list.append(Data(**kwargs))\n\n # Return the DataFrame containing the data\n return DataFrame(data=data_list)\n" + "value": "import json\nfrom pathlib import Path\nfrom typing import Any\n\nfrom cryptography.fernet import InvalidToken\nfrom langchain_chroma import Chroma\nfrom loguru import logger\nfrom pydantic import SecretStr\n\nfrom langflow.base.knowledge_bases.knowledge_base_utils import get_knowledge_bases\nfrom langflow.custom import Component\nfrom langflow.io import BoolInput, DropdownInput, IntInput, MessageTextInput, Output, SecretStrInput\nfrom langflow.schema.data import Data\nfrom langflow.schema.dataframe import DataFrame\nfrom langflow.services.auth.utils import decrypt_api_key\nfrom langflow.services.database.models.user.crud import get_user_by_id\nfrom langflow.services.deps import get_settings_service, session_scope\n\nsettings = get_settings_service().settings\nknowledge_directory = settings.knowledge_bases_dir\nif not knowledge_directory:\n msg = \"Knowledge bases directory is not set in the settings.\"\n raise ValueError(msg)\nKNOWLEDGE_BASES_ROOT_PATH = Path(knowledge_directory).expanduser()\n\n\nclass KnowledgeRetrievalComponent(Component):\n display_name = \"Knowledge Retrieval\"\n description = \"Search and retrieve data from knowledge.\"\n icon = \"download\"\n name = \"KnowledgeRetrieval\"\n\n inputs = [\n DropdownInput(\n name=\"knowledge_base\",\n display_name=\"Knowledge\",\n info=\"Select the knowledge to load data from.\",\n required=True,\n options=[],\n refresh_button=True,\n real_time_refresh=True,\n ),\n SecretStrInput(\n name=\"api_key\",\n display_name=\"Embedding Provider API Key\",\n info=\"API key for the embedding provider to generate embeddings.\",\n advanced=True,\n required=False,\n ),\n MessageTextInput(\n name=\"search_query\",\n display_name=\"Search Query\",\n info=\"Optional search query to filter knowledge base data.\",\n tool_mode=True,\n ),\n IntInput(\n name=\"top_k\",\n display_name=\"Top K Results\",\n info=\"Number of top results to return from the knowledge base.\",\n value=5,\n advanced=True,\n required=False,\n ),\n BoolInput(\n name=\"include_metadata\",\n display_name=\"Include Metadata\",\n info=\"Whether to include all metadata in the output. If false, only content is returned.\",\n value=True,\n advanced=False,\n ),\n BoolInput(\n name=\"include_embeddings\",\n display_name=\"Include Embeddings\",\n info=\"Whether to include embeddings in the output. Only applicable if 'Include Metadata' is enabled.\",\n value=False,\n advanced=True,\n ),\n ]\n\n outputs = [\n Output(\n name=\"retrieve_data\",\n display_name=\"Results\",\n method=\"retrieve_data\",\n info=\"Returns the data from the selected knowledge base.\",\n ),\n ]\n\n async def update_build_config(self, build_config, field_value, field_name=None): # noqa: ARG002\n if field_name == \"knowledge_base\":\n # Update the knowledge base options dynamically\n build_config[\"knowledge_base\"][\"options\"] = await get_knowledge_bases(\n KNOWLEDGE_BASES_ROOT_PATH,\n user_id=self.user_id, # Use the user_id from the component context\n )\n\n # If the selected knowledge base is not available, reset it\n if build_config[\"knowledge_base\"][\"value\"] not in build_config[\"knowledge_base\"][\"options\"]:\n build_config[\"knowledge_base\"][\"value\"] = None\n\n return build_config\n\n def _get_kb_metadata(self, kb_path: Path) -> dict:\n \"\"\"Load and process knowledge base metadata.\"\"\"\n metadata: dict[str, Any] = {}\n metadata_file = kb_path / \"embedding_metadata.json\"\n if not metadata_file.exists():\n logger.warning(f\"Embedding metadata file not found at {metadata_file}\")\n return metadata\n\n try:\n with metadata_file.open(\"r\", encoding=\"utf-8\") as f:\n metadata = json.load(f)\n except json.JSONDecodeError:\n logger.error(f\"Error decoding JSON from {metadata_file}\")\n return {}\n\n # Decrypt API key if it exists\n if \"api_key\" in metadata and metadata.get(\"api_key\"):\n settings_service = get_settings_service()\n try:\n decrypted_key = decrypt_api_key(metadata[\"api_key\"], settings_service)\n metadata[\"api_key\"] = decrypted_key\n except (InvalidToken, TypeError, ValueError) as e:\n logger.error(f\"Could not decrypt API key. Please provide it manually. Error: {e}\")\n metadata[\"api_key\"] = None\n return metadata\n\n def _build_embeddings(self, metadata: dict):\n \"\"\"Build embedding model from metadata.\"\"\"\n runtime_api_key = self.api_key.get_secret_value() if isinstance(self.api_key, SecretStr) else self.api_key\n provider = metadata.get(\"embedding_provider\")\n model = metadata.get(\"embedding_model\")\n api_key = runtime_api_key or metadata.get(\"api_key\")\n chunk_size = metadata.get(\"chunk_size\")\n\n # Handle various providers\n if provider == \"OpenAI\":\n from langchain_openai import OpenAIEmbeddings\n\n if not api_key:\n msg = \"OpenAI API key is required. Provide it in the component's advanced settings.\"\n raise ValueError(msg)\n return OpenAIEmbeddings(\n model=model,\n api_key=api_key,\n chunk_size=chunk_size,\n )\n if provider == \"HuggingFace\":\n from langchain_huggingface import HuggingFaceEmbeddings\n\n return HuggingFaceEmbeddings(\n model=model,\n )\n if provider == \"Cohere\":\n from langchain_cohere import CohereEmbeddings\n\n if not api_key:\n msg = \"Cohere API key is required when using Cohere provider\"\n raise ValueError(msg)\n return CohereEmbeddings(\n model=model,\n cohere_api_key=api_key,\n )\n if provider == \"Custom\":\n # For custom embedding models, we would need additional configuration\n msg = \"Custom embedding models not yet supported\"\n raise NotImplementedError(msg)\n # Add other providers here if they become supported in ingest\n msg = f\"Embedding provider '{provider}' is not supported for retrieval.\"\n raise NotImplementedError(msg)\n\n async def retrieve_data(self) -> DataFrame:\n \"\"\"Retrieve data from the selected knowledge base by reading the Chroma collection.\n\n Returns:\n A DataFrame containing the data rows from the knowledge base.\n \"\"\"\n # Get the current user\n async with session_scope() as db:\n if not self.user_id:\n msg = \"User ID is required for fetching Knowledge Base data.\"\n raise ValueError(msg)\n current_user = await get_user_by_id(db, self.user_id)\n if not current_user:\n msg = f\"User with ID {self.user_id} not found.\"\n raise ValueError(msg)\n kb_user = current_user.username\n kb_path = KNOWLEDGE_BASES_ROOT_PATH / kb_user / self.knowledge_base\n\n metadata = self._get_kb_metadata(kb_path)\n if not metadata:\n msg = f\"Metadata not found for knowledge base: {self.knowledge_base}. Ensure it has been indexed.\"\n raise ValueError(msg)\n\n # Build the embedder for the knowledge base\n embedding_function = self._build_embeddings(metadata)\n\n # Load vector store\n chroma = Chroma(\n persist_directory=str(kb_path),\n embedding_function=embedding_function,\n collection_name=self.knowledge_base,\n )\n\n # If a search query is provided, perform a similarity search\n if self.search_query:\n # Use the search query to perform a similarity search\n logger.info(f\"Performing similarity search with query: {self.search_query}\")\n results = chroma.similarity_search_with_score(\n query=self.search_query or \"\",\n k=self.top_k,\n )\n else:\n results = chroma.similarity_search(\n query=self.search_query or \"\",\n k=self.top_k,\n )\n\n # For each result, make it a tuple to match the expected output format\n results = [(doc, 0) for doc in results] # Assign a dummy score of 0\n\n # If include_embeddings is enabled, get embeddings for the results\n id_to_embedding = {}\n if self.include_embeddings and results:\n doc_ids = [doc[0].metadata.get(\"_id\") for doc in results if doc[0].metadata.get(\"_id\")]\n\n # Only proceed if we have valid document IDs\n if doc_ids:\n # Access underlying client to get embeddings\n collection = chroma._client.get_collection(name=self.knowledge_base)\n embeddings_result = collection.get(where={\"_id\": {\"$in\": doc_ids}}, include=[\"metadatas\", \"embeddings\"])\n\n # Create a mapping from document ID to embedding\n for i, metadata in enumerate(embeddings_result.get(\"metadatas\", [])):\n if metadata and \"_id\" in metadata:\n id_to_embedding[metadata[\"_id\"]] = embeddings_result[\"embeddings\"][i]\n\n # Build output data based on include_metadata setting\n data_list = []\n for doc in results:\n kwargs = {\n \"content\": doc[0].page_content,\n }\n if self.search_query:\n kwargs[\"_score\"] = -1 * doc[1]\n if self.include_metadata:\n # Include all metadata, embeddings, and content\n kwargs.update(doc[0].metadata)\n if self.include_embeddings:\n kwargs[\"_embeddings\"] = id_to_embedding.get(doc[0].metadata.get(\"_id\"))\n\n data_list.append(Data(**kwargs))\n\n # Return the DataFrame containing the data\n return DataFrame(data=data_list)\n" + }, + "include_embeddings": { + "_input_type": "BoolInput", + "advanced": true, + "display_name": "Include Embeddings", + "dynamic": false, + "info": "Whether to include embeddings in the output. Only applicable if 'Include Metadata' is enabled.", + "list": false, + "list_add_label": "Add More", + "name": "include_embeddings", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "bool", + "value": false }, "include_metadata": { "_input_type": "BoolInput", "advanced": false, "display_name": "Include Metadata", "dynamic": false, - "info": "Whether to include all metadata and embeddings in the output. If false, only content is returned.", + "info": "Whether to include all metadata in the output. If false, only content is returned.", "list": false, "list_add_label": "Add More", "name": "include_metadata", @@ -712,7 +729,7 @@ "required": false, "show": true, "title_case": false, - "tool_mode": false, + "tool_mode": true, "trace_as_input": true, "trace_as_metadata": true, "type": "str", @@ -740,33 +757,33 @@ "tool_mode": false }, "showNode": true, - "type": "KBRetrieval" + "type": "KnowledgeRetrieval" }, "dragging": false, - "id": "KBRetrieval-zz3I0", + "id": "KnowledgeRetrieval-kgwih", "measured": { "height": 329, "width": 320 }, "position": { - "x": 616.6226476085393, - "y": -343.13068334363356 + "x": 635.7729107873928, + "y": -337.735556504938 }, "selected": false, "type": "genericNode" } ], "viewport": { - "x": 177.06633386268413, - "y": 482.8027480187026, - "zoom": 0.8999566725119924 + "x": 90.80151692622985, + "y": 591.5552904741861, + "zoom": 1.0053880437940252 } }, "description": "An example of performing a vector search against data in a Knowledge Base to retrieve relevant documents.", "endpoint_name": null, - "id": "5487ee05-73d5-4b12-9b41-bc4c3a2f9326", + "id": "e262d49f-c800-4962-948b-c94f79eb5fb8", "is_component": false, - "last_tested_version": "1.5.0.post1", + "last_tested_version": "1.5.0.post2", "name": "Knowledge Retrieval", "tags": [] } \ No newline at end of file diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json b/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json index 7db8b7a5690d..9d4785b1f4e5 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json @@ -7,7 +7,7 @@ "data": { "sourceHandle": { "dataType": "ChatInput", - "id": "ChatInput-w1dxV", + "id": "ChatInput-insg9", "name": "message", "output_types": [ "Message" @@ -15,7 +15,7 @@ }, "targetHandle": { "fieldName": "question", - "id": "Prompt-jBCaI", + "id": "Prompt-wgAUs", "inputTypes": [ "Message", "Text" @@ -23,12 +23,12 @@ "type": "str" } }, - "id": "reactflow__edge-ChatInput-w1dxV{œdataTypeœ:œChatInputœ,œidœ:œChatInput-w1dxVœ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}-Prompt-jBCaI{œfieldNameœ:œquestionœ,œidœ:œPrompt-jBCaIœ,œinputTypesœ:[œMessageœ,œTextœ],œtypeœ:œstrœ}", + "id": "reactflow__edge-ChatInput-insg9{œdataTypeœ:œChatInputœ,œidœ:œChatInput-insg9œ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}-Prompt-wgAUs{œfieldNameœ:œquestionœ,œidœ:œPrompt-wgAUsœ,œinputTypesœ:[œMessageœ,œTextœ],œtypeœ:œstrœ}", "selected": false, - "source": "ChatInput-w1dxV", - "sourceHandle": "{œdataTypeœ: œChatInputœ, œidœ: œChatInput-w1dxVœ, œnameœ: œmessageœ, œoutput_typesœ: [œMessageœ]}", - "target": "Prompt-jBCaI", - "targetHandle": "{œfieldNameœ: œquestionœ, œidœ: œPrompt-jBCaIœ, œinputTypesœ: [œMessageœ, œTextœ], œtypeœ: œstrœ}" + "source": "ChatInput-insg9", + "sourceHandle": "{œdataTypeœ: œChatInputœ, œidœ: œChatInput-insg9œ, œnameœ: œmessageœ, œoutput_typesœ: [œMessageœ]}", + "target": "Prompt-wgAUs", + "targetHandle": "{œfieldNameœ: œquestionœ, œidœ: œPrompt-wgAUsœ, œinputTypesœ: [œMessageœ, œTextœ], œtypeœ: œstrœ}" }, { "animated": false, @@ -36,7 +36,7 @@ "data": { "sourceHandle": { "dataType": "parser", - "id": "parser-LlSmH", + "id": "parser-gcmKF", "name": "parsed_text", "output_types": [ "Message" @@ -44,7 +44,7 @@ }, "targetHandle": { "fieldName": "context", - "id": "Prompt-jBCaI", + "id": "Prompt-wgAUs", "inputTypes": [ "Message", "Text" @@ -52,12 +52,12 @@ "type": "str" } }, - "id": "reactflow__edge-parser-LlSmH{œdataTypeœ:œparserœ,œidœ:œparser-LlSmHœ,œnameœ:œparsed_textœ,œoutput_typesœ:[œMessageœ]}-Prompt-jBCaI{œfieldNameœ:œcontextœ,œidœ:œPrompt-jBCaIœ,œinputTypesœ:[œMessageœ,œTextœ],œtypeœ:œstrœ}", + "id": "reactflow__edge-parser-gcmKF{œdataTypeœ:œparserœ,œidœ:œparser-gcmKFœ,œnameœ:œparsed_textœ,œoutput_typesœ:[œMessageœ]}-Prompt-wgAUs{œfieldNameœ:œcontextœ,œidœ:œPrompt-wgAUsœ,œinputTypesœ:[œMessageœ,œTextœ],œtypeœ:œstrœ}", "selected": false, - "source": "parser-LlSmH", - "sourceHandle": "{œdataTypeœ: œparserœ, œidœ: œparser-LlSmHœ, œnameœ: œparsed_textœ, œoutput_typesœ: [œMessageœ]}", - "target": "Prompt-jBCaI", - "targetHandle": "{œfieldNameœ: œcontextœ, œidœ: œPrompt-jBCaIœ, œinputTypesœ: [œMessageœ, œTextœ], œtypeœ: œstrœ}" + "source": "parser-gcmKF", + "sourceHandle": "{œdataTypeœ: œparserœ, œidœ: œparser-gcmKFœ, œnameœ: œparsed_textœ, œoutput_typesœ: [œMessageœ]}", + "target": "Prompt-wgAUs", + "targetHandle": "{œfieldNameœ: œcontextœ, œidœ: œPrompt-wgAUsœ, œinputTypesœ: [œMessageœ, œTextœ], œtypeœ: œstrœ}" }, { "animated": false, @@ -65,7 +65,7 @@ "data": { "sourceHandle": { "dataType": "File", - "id": "File-TyJTr", + "id": "File-wqFzl", "name": "message", "output_types": [ "Message" @@ -73,7 +73,7 @@ }, "targetHandle": { "fieldName": "data_inputs", - "id": "SplitText-YWbhC", + "id": "SplitText-50nDI", "inputTypes": [ "Data", "DataFrame", @@ -82,12 +82,12 @@ "type": "other" } }, - "id": "reactflow__edge-File-TyJTr{œdataTypeœ:œFileœ,œidœ:œFile-TyJTrœ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}-SplitText-YWbhC{œfieldNameœ:œdata_inputsœ,œidœ:œSplitText-YWbhCœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}", + "id": "reactflow__edge-File-wqFzl{œdataTypeœ:œFileœ,œidœ:œFile-wqFzlœ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}-SplitText-50nDI{œfieldNameœ:œdata_inputsœ,œidœ:œSplitText-50nDIœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}", "selected": false, - "source": "File-TyJTr", - "sourceHandle": "{œdataTypeœ: œFileœ, œidœ: œFile-TyJTrœ, œnameœ: œmessageœ, œoutput_typesœ: [œMessageœ]}", - "target": "SplitText-YWbhC", - "targetHandle": "{œfieldNameœ: œdata_inputsœ, œidœ: œSplitText-YWbhCœ, œinputTypesœ: [œDataœ, œDataFrameœ, œMessageœ], œtypeœ: œotherœ}" + "source": "File-wqFzl", + "sourceHandle": "{œdataTypeœ: œFileœ, œidœ: œFile-wqFzlœ, œnameœ: œmessageœ, œoutput_typesœ: [œMessageœ]}", + "target": "SplitText-50nDI", + "targetHandle": "{œfieldNameœ: œdata_inputsœ, œidœ: œSplitText-50nDIœ, œinputTypesœ: [œDataœ, œDataFrameœ, œMessageœ], œtypeœ: œotherœ}" }, { "animated": false, @@ -95,7 +95,7 @@ "data": { "sourceHandle": { "dataType": "Prompt", - "id": "Prompt-jBCaI", + "id": "Prompt-wgAUs", "name": "prompt", "output_types": [ "Message" @@ -103,19 +103,19 @@ }, "targetHandle": { "fieldName": "input_value", - "id": "LanguageModelComponent-XEWmQ", + "id": "LanguageModelComponent-nQYc0", "inputTypes": [ "Message" ], "type": "str" } }, - "id": "reactflow__edge-Prompt-jBCaI{œdataTypeœ:œPromptœ,œidœ:œPrompt-jBCaIœ,œnameœ:œpromptœ,œoutput_typesœ:[œMessageœ]}-LanguageModelComponent-XEWmQ{œfieldNameœ:œinput_valueœ,œidœ:œLanguageModelComponent-XEWmQœ,œinputTypesœ:[œMessageœ],œtypeœ:œstrœ}", + "id": "reactflow__edge-Prompt-wgAUs{œdataTypeœ:œPromptœ,œidœ:œPrompt-wgAUsœ,œnameœ:œpromptœ,œoutput_typesœ:[œMessageœ]}-LanguageModelComponent-nQYc0{œfieldNameœ:œinput_valueœ,œidœ:œLanguageModelComponent-nQYc0œ,œinputTypesœ:[œMessageœ],œtypeœ:œstrœ}", "selected": false, - "source": "Prompt-jBCaI", - "sourceHandle": "{œdataTypeœ: œPromptœ, œidœ: œPrompt-jBCaIœ, œnameœ: œpromptœ, œoutput_typesœ: [œMessageœ]}", - "target": "LanguageModelComponent-XEWmQ", - "targetHandle": "{œfieldNameœ: œinput_valueœ, œidœ: œLanguageModelComponent-XEWmQœ, œinputTypesœ: [œMessageœ], œtypeœ: œstrœ}" + "source": "Prompt-wgAUs", + "sourceHandle": "{œdataTypeœ: œPromptœ, œidœ: œPrompt-wgAUsœ, œnameœ: œpromptœ, œoutput_typesœ: [œMessageœ]}", + "target": "LanguageModelComponent-nQYc0", + "targetHandle": "{œfieldNameœ: œinput_valueœ, œidœ: œLanguageModelComponent-nQYc0œ, œinputTypesœ: [œMessageœ], œtypeœ: œstrœ}" }, { "animated": false, @@ -123,7 +123,7 @@ "data": { "sourceHandle": { "dataType": "LanguageModelComponent", - "id": "LanguageModelComponent-XEWmQ", + "id": "LanguageModelComponent-nQYc0", "name": "text_output", "output_types": [ "Message" @@ -131,7 +131,7 @@ }, "targetHandle": { "fieldName": "input_value", - "id": "ChatOutput-jIJTT", + "id": "ChatOutput-VG394", "inputTypes": [ "Data", "DataFrame", @@ -140,12 +140,12 @@ "type": "str" } }, - "id": "reactflow__edge-LanguageModelComponent-XEWmQ{œdataTypeœ:œLanguageModelComponentœ,œidœ:œLanguageModelComponent-XEWmQœ,œnameœ:œtext_outputœ,œoutput_typesœ:[œMessageœ]}-ChatOutput-jIJTT{œfieldNameœ:œinput_valueœ,œidœ:œChatOutput-jIJTTœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œstrœ}", + "id": "reactflow__edge-LanguageModelComponent-nQYc0{œdataTypeœ:œLanguageModelComponentœ,œidœ:œLanguageModelComponent-nQYc0œ,œnameœ:œtext_outputœ,œoutput_typesœ:[œMessageœ]}-ChatOutput-VG394{œfieldNameœ:œinput_valueœ,œidœ:œChatOutput-VG394œ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œstrœ}", "selected": false, - "source": "LanguageModelComponent-XEWmQ", - "sourceHandle": "{œdataTypeœ: œLanguageModelComponentœ, œidœ: œLanguageModelComponent-XEWmQœ, œnameœ: œtext_outputœ, œoutput_typesœ: [œMessageœ]}", - "target": "ChatOutput-jIJTT", - "targetHandle": "{œfieldNameœ: œinput_valueœ, œidœ: œChatOutput-jIJTTœ, œinputTypesœ: [œDataœ, œDataFrameœ, œMessageœ], œtypeœ: œstrœ}" + "source": "LanguageModelComponent-nQYc0", + "sourceHandle": "{œdataTypeœ: œLanguageModelComponentœ, œidœ: œLanguageModelComponent-nQYc0œ, œnameœ: œtext_outputœ, œoutput_typesœ: [œMessageœ]}", + "target": "ChatOutput-VG394", + "targetHandle": "{œfieldNameœ: œinput_valueœ, œidœ: œChatOutput-VG394œ, œinputTypesœ: [œDataœ, œDataFrameœ, œMessageœ], œtypeœ: œstrœ}" }, { "animated": false, @@ -153,7 +153,7 @@ "data": { "sourceHandle": { "dataType": "SplitText", - "id": "SplitText-YWbhC", + "id": "SplitText-50nDI", "name": "dataframe", "output_types": [ "DataFrame" @@ -161,7 +161,7 @@ }, "targetHandle": { "fieldName": "ingest_data", - "id": "AstraDB-92S9t", + "id": "AstraDB-t8lcj", "inputTypes": [ "Data", "DataFrame" @@ -169,20 +169,19 @@ "type": "other" } }, - "id": "reactflow__edge-SplitText-YWbhC{œdataTypeœ:œSplitTextœ,œidœ:œSplitText-YWbhCœ,œnameœ:œdataframeœ,œoutput_typesœ:[œDataFrameœ]}-AstraDB-92S9t{œfieldNameœ:œingest_dataœ,œidœ:œAstraDB-92S9tœ,œinputTypesœ:[œDataœ,œDataFrameœ],œtypeœ:œotherœ}", + "id": "reactflow__edge-SplitText-50nDI{œdataTypeœ:œSplitTextœ,œidœ:œSplitText-50nDIœ,œnameœ:œdataframeœ,œoutput_typesœ:[œDataFrameœ]}-AstraDB-t8lcj{œfieldNameœ:œingest_dataœ,œidœ:œAstraDB-t8lcjœ,œinputTypesœ:[œDataœ,œDataFrameœ],œtypeœ:œotherœ}", "selected": false, - "source": "SplitText-YWbhC", - "sourceHandle": "{œdataTypeœ: œSplitTextœ, œidœ: œSplitText-YWbhCœ, œnameœ: œdataframeœ, œoutput_typesœ: [œDataFrameœ]}", - "target": "AstraDB-92S9t", - "targetHandle": "{œfieldNameœ: œingest_dataœ, œidœ: œAstraDB-92S9tœ, œinputTypesœ: [œDataœ, œDataFrameœ], œtypeœ: œotherœ}" + "source": "SplitText-50nDI", + "sourceHandle": "{œdataTypeœ: œSplitTextœ, œidœ: œSplitText-50nDIœ, œnameœ: œdataframeœ, œoutput_typesœ: [œDataFrameœ]}", + "target": "AstraDB-t8lcj", + "targetHandle": "{œfieldNameœ: œingest_dataœ, œidœ: œAstraDB-t8lcjœ, œinputTypesœ: [œDataœ, œDataFrameœ], œtypeœ: œotherœ}" }, { - "animated": false, "className": "", "data": { "sourceHandle": { "dataType": "ChatInput", - "id": "ChatInput-w1dxV", + "id": "ChatInput-insg9", "name": "message", "output_types": [ "Message" @@ -190,26 +189,25 @@ }, "targetHandle": { "fieldName": "search_query", - "id": "AstraDB-Lm2es", + "id": "AstraDB-CLCyc", "inputTypes": [ "Message" ], "type": "query" } }, - "id": "reactflow__edge-ChatInput-w1dxV{œdataTypeœ:œChatInputœ,œidœ:œChatInput-w1dxVœ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}-AstraDB-Lm2es{œfieldNameœ:œsearch_queryœ,œidœ:œAstraDB-Lm2esœ,œinputTypesœ:[œMessageœ],œtypeœ:œqueryœ}", - "selected": false, - "source": "ChatInput-w1dxV", - "sourceHandle": "{œdataTypeœ: œChatInputœ, œidœ: œChatInput-w1dxVœ, œnameœ: œmessageœ, œoutput_typesœ: [œMessageœ]}", - "target": "AstraDB-Lm2es", - "targetHandle": "{œfieldNameœ: œsearch_queryœ, œidœ: œAstraDB-Lm2esœ, œinputTypesœ: [œMessageœ], œtypeœ: œqueryœ}" + "id": "xy-edge__ChatInput-insg9{œdataTypeœ:œChatInputœ,œidœ:œChatInput-insg9œ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}-AstraDB-CLCyc{œfieldNameœ:œsearch_queryœ,œidœ:œAstraDB-CLCycœ,œinputTypesœ:[œMessageœ],œtypeœ:œqueryœ}", + "source": "ChatInput-insg9", + "sourceHandle": "{œdataTypeœ: œChatInputœ, œidœ: œChatInput-insg9œ, œnameœ: œmessageœ, œoutput_typesœ: [œMessageœ]}", + "target": "AstraDB-CLCyc", + "targetHandle": "{œfieldNameœ: œsearch_queryœ, œidœ: œAstraDB-CLCycœ, œinputTypesœ: [œMessageœ], œtypeœ: œqueryœ}" }, { "className": "", "data": { "sourceHandle": { "dataType": "AstraDB", - "id": "AstraDB-Lm2es", + "id": "AstraDB-CLCyc", "name": "dataframe", "output_types": [ "DataFrame" @@ -217,7 +215,7 @@ }, "targetHandle": { "fieldName": "input_data", - "id": "parser-LlSmH", + "id": "parser-gcmKF", "inputTypes": [ "DataFrame", "Data" @@ -225,11 +223,11 @@ "type": "other" } }, - "id": "xy-edge__AstraDB-Lm2es{œdataTypeœ:œAstraDBœ,œidœ:œAstraDB-Lm2esœ,œnameœ:œdataframeœ,œoutput_typesœ:[œDataFrameœ]}-parser-LlSmH{œfieldNameœ:œinput_dataœ,œidœ:œparser-LlSmHœ,œinputTypesœ:[œDataFrameœ,œDataœ],œtypeœ:œotherœ}", - "source": "AstraDB-Lm2es", - "sourceHandle": "{œdataTypeœ: œAstraDBœ, œidœ: œAstraDB-Lm2esœ, œnameœ: œdataframeœ, œoutput_typesœ: [œDataFrameœ]}", - "target": "parser-LlSmH", - "targetHandle": "{œfieldNameœ: œinput_dataœ, œidœ: œparser-LlSmHœ, œinputTypesœ: [œDataFrameœ, œDataœ], œtypeœ: œotherœ}" + "id": "xy-edge__AstraDB-CLCyc{œdataTypeœ:œAstraDBœ,œidœ:œAstraDB-CLCycœ,œnameœ:œdataframeœ,œoutput_typesœ:[œDataFrameœ]}-parser-gcmKF{œfieldNameœ:œinput_dataœ,œidœ:œparser-gcmKFœ,œinputTypesœ:[œDataFrameœ,œDataœ],œtypeœ:œotherœ}", + "source": "AstraDB-CLCyc", + "sourceHandle": "{œdataTypeœ: œAstraDBœ, œidœ: œAstraDB-CLCycœ, œnameœ: œdataframeœ, œoutput_typesœ: [œDataFrameœ]}", + "target": "parser-gcmKF", + "targetHandle": "{œfieldNameœ: œinput_dataœ, œidœ: œparser-gcmKFœ, œinputTypesœ: [œDataFrameœ, œDataœ], œtypeœ: œotherœ}" } ], "nodes": [ @@ -237,7 +235,7 @@ "data": { "description": "Get chat inputs from the Playground.", "display_name": "Chat Input", - "id": "ChatInput-w1dxV", + "id": "ChatInput-insg9", "node": { "base_classes": [ "Message" @@ -516,7 +514,7 @@ }, "dragging": false, "height": 234, - "id": "ChatInput-w1dxV", + "id": "ChatInput-insg9", "measured": { "height": 234, "width": 320 @@ -537,7 +535,7 @@ "data": { "description": "Create a prompt template with dynamic variables.", "display_name": "Prompt", - "id": "Prompt-jBCaI", + "id": "Prompt-wgAUs", "node": { "base_classes": [ "Message" @@ -701,7 +699,7 @@ }, "dragging": false, "height": 433, - "id": "Prompt-jBCaI", + "id": "Prompt-wgAUs", "measured": { "height": 433, "width": 320 @@ -722,7 +720,7 @@ "data": { "description": "Split text into chunks based on specified criteria.", "display_name": "Split Text", - "id": "SplitText-YWbhC", + "id": "SplitText-50nDI", "node": { "base_classes": [ "Data" @@ -924,7 +922,7 @@ }, "dragging": false, "height": 475, - "id": "SplitText-YWbhC", + "id": "SplitText-50nDI", "measured": { "height": 475, "width": 320 @@ -943,7 +941,7 @@ }, { "data": { - "id": "note-vztSC", + "id": "note-0MDGm", "node": { "description": "## 🐕 2. Retriever Flow\n\nThis flow answers your questions with contextual data retrieved from your vector database.\n\nOpen the **Playground** and ask, \n\n```\nWhat is this document about?\n```\n", "display_name": "", @@ -956,7 +954,7 @@ }, "dragging": false, "height": 324, - "id": "note-vztSC", + "id": "note-0MDGm", "measured": { "height": 324, "width": 324 @@ -980,7 +978,7 @@ }, { "data": { - "id": "note-4b71P", + "id": "note-mSH4A", "node": { "description": "Retrieval Augmented Generation (RAG) is a way of providing additional context to a Large Language Model (LLM) by preloading a vector database with embeddings for relevant content. When a user chats with the LLM, a _similarity search_ retrieves relevant content by comparing an embedding for the user's query against the embeddings in the vector database.\nFor example, a RAG chatbot could be pre-loaded with product data, and then it can help customers find specific products based on their queries.\nThis template has two sub-flows. One flow loads data into your vector store, and the other is the user-driven chat flow that compares a new query against the existing content in your vector database.\n\n## Quickstart\n1. Add your OpenAI API key to the **Language Model** component and the two **Embeddings** components.\n2. Add an Astra application token to the **Astra DB** vector store components, or replace these components with other vector store components available in the **Components** menu.\n**💡 Store your credentials as Langflow global variables 🌐 to simplify token management and reuse in your flows.**\n\n## Run the flows\n1. Load your data into a vector database with the 📚 **Load Data** flow. Select a file to upload in the **File** component, and then click **Play** ▶️ on the **Astra DB** component to run the **Load Data** flow.\n2. Open the **Playground** to start a chat with the 🐕 **Retriever** flow.\n\nOnly the run the **Load Data** flow when you need to populate your vector database with baseline content, such as product data.\nThe **Retriever** flow is the user-facing chat flow. This flow generates an embedding from chat input, runs a similarity search against the vector database to retrieve relevant content, and then passes the original query and the retrieved content to the LLM, which produces the chat response sent to the user.\n\n## Next steps\nExperiment by changing the prompt and the loaded data to see how the LLM's responses change.", "display_name": "Read Me", @@ -993,7 +991,7 @@ }, "dragging": false, "height": 556, - "id": "note-4b71P", + "id": "note-mSH4A", "measured": { "height": 556, "width": 389 @@ -1019,7 +1017,7 @@ "data": { "description": "Display a chat message in the Playground.", "display_name": "Chat Output", - "id": "ChatOutput-jIJTT", + "id": "ChatOutput-VG394", "node": { "base_classes": [ "Message" @@ -1322,7 +1320,7 @@ }, "dragging": false, "height": 234, - "id": "ChatOutput-jIJTT", + "id": "ChatOutput-VG394", "measured": { "height": 234, "width": 320 @@ -1341,7 +1339,7 @@ }, { "data": { - "id": "OpenAIEmbeddings-d0XPm", + "id": "OpenAIEmbeddings-y2ymc", "node": { "base_classes": [ "Embeddings" @@ -1834,7 +1832,7 @@ }, "dragging": false, "height": 320, - "id": "OpenAIEmbeddings-d0XPm", + "id": "OpenAIEmbeddings-y2ymc", "measured": { "height": 320, "width": 320 @@ -1853,7 +1851,7 @@ }, { "data": { - "id": "note-AeLDI", + "id": "note-8ieVo", "node": { "description": "## 📚 1. Load Data Flow\n\nRun this first! Load data from a local file and embed it into the vector database.\n\nSelect a Database and a Collection, or create new ones. \n\nClick **Run component** on the **Astra DB** component to load your data.\n\n\n### Next steps:\n Experiment by changing the prompt and the contextual data to see how the retrieval flow's responses change.", "display_name": "", @@ -1866,7 +1864,7 @@ }, "dragging": false, "height": 460, - "id": "note-AeLDI", + "id": "note-8ieVo", "measured": { "height": 460, "width": 340 @@ -1890,7 +1888,7 @@ }, { "data": { - "id": "OpenAIEmbeddings-8Zseb", + "id": "OpenAIEmbeddings-cusQX", "node": { "base_classes": [ "Embeddings" @@ -2383,7 +2381,7 @@ }, "dragging": false, "height": 320, - "id": "OpenAIEmbeddings-8Zseb", + "id": "OpenAIEmbeddings-cusQX", "measured": { "height": 320, "width": 320 @@ -2402,7 +2400,7 @@ }, { "data": { - "id": "note-Dkwdo", + "id": "note-Sk1It", "node": { "description": "### 💡 Add your OpenAI API key here 👇", "display_name": "", @@ -2415,7 +2413,7 @@ }, "dragging": false, "height": 324, - "id": "note-Dkwdo", + "id": "note-Sk1It", "measured": { "height": 324, "width": 324 @@ -2434,7 +2432,7 @@ }, { "data": { - "id": "note-YLhWR", + "id": "note-qmLAt", "node": { "description": "### 💡 Add your OpenAI API key here 👇", "display_name": "", @@ -2447,7 +2445,7 @@ }, "dragging": false, "height": 324, - "id": "note-YLhWR", + "id": "note-qmLAt", "measured": { "height": 324, "width": 324 @@ -2466,7 +2464,7 @@ }, { "data": { - "id": "note-Il7AR", + "id": "note-UCSuM", "node": { "description": "### 💡 Add your OpenAI API key here 👇", "display_name": "", @@ -2479,7 +2477,7 @@ }, "dragging": false, "height": 324, - "id": "note-Il7AR", + "id": "note-UCSuM", "measured": { "height": 324, "width": 324 @@ -2498,7 +2496,7 @@ }, { "data": { - "id": "parser-LlSmH", + "id": "parser-gcmKF", "node": { "base_classes": [ "Message" @@ -2660,7 +2658,7 @@ "type": "parser" }, "dragging": false, - "id": "parser-LlSmH", + "id": "parser-gcmKF", "measured": { "height": 361, "width": 320 @@ -2674,7 +2672,7 @@ }, { "data": { - "id": "File-TyJTr", + "id": "File-wqFzl", "node": { "base_classes": [ "Message" @@ -2699,7 +2697,7 @@ ], "frozen": false, "icon": "file-text", - "last_updated": "2025-08-26T16:46:21.648Z", + "last_updated": "2025-08-27T14:19:16.203Z", "legacy": false, "metadata": { "code_hash": "23fbe9daca09", @@ -2765,7 +2763,7 @@ "show": true, "title_case": false, "type": "code", - "value": "\"\"\"Enhanced file component v2 with mypy and ruff compliance.\"\"\"\n\nfrom __future__ import annotations\n\nfrom copy import deepcopy\nfrom enum import Enum\nfrom typing import TYPE_CHECKING, Any\n\nfrom langflow.base.data.base_file import BaseFileComponent\nfrom langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data\nfrom langflow.io import (\n BoolInput,\n DropdownInput,\n FileInput,\n IntInput,\n MessageTextInput,\n Output,\n StrInput,\n)\nfrom langflow.schema.data import Data\nfrom langflow.schema.message import Message\n\nif TYPE_CHECKING:\n from langflow.schema import DataFrame\n\n\nclass MockConversionStatus(Enum):\n \"\"\"Mock ConversionStatus for fallback compatibility.\"\"\"\n\n SUCCESS = \"success\"\n FAILURE = \"failure\"\n\n\nclass MockInputFormat(Enum):\n \"\"\"Mock InputFormat for fallback compatibility.\"\"\"\n\n PDF = \"pdf\"\n IMAGE = \"image\"\n\n\nclass MockImageRefMode(Enum):\n \"\"\"Mock ImageRefMode for fallback compatibility.\"\"\"\n\n PLACEHOLDER = \"placeholder\"\n EMBEDDED = \"embedded\"\n\n\nclass DoclingImports:\n \"\"\"Container for docling imports with type information.\"\"\"\n\n def __init__(\n self,\n conversion_status: type[Enum],\n input_format: type[Enum],\n document_converter: type,\n image_ref_mode: type[Enum],\n strategy: str,\n ) -> None:\n self.conversion_status = conversion_status\n self.input_format = input_format\n self.document_converter = document_converter\n self.image_ref_mode = image_ref_mode\n self.strategy = strategy\n\n\nclass FileComponent(BaseFileComponent):\n \"\"\"Enhanced file component v2 that combines standard file loading with optional Docling processing and export.\n\n This component supports all features of the standard File component, plus an advanced mode\n that enables Docling document processing and export to various formats (Markdown, HTML, etc.).\n \"\"\"\n\n display_name = \"File\"\n description = \"Loads content from files with optional advanced document processing and export using Docling.\"\n documentation: str = \"https://docs.langflow.org/components-data#file\"\n icon = \"file-text\"\n name = \"File\"\n\n # Docling supported formats from original component\n VALID_EXTENSIONS = [\n \"adoc\",\n \"asciidoc\",\n \"asc\",\n \"bmp\",\n \"csv\",\n \"dotx\",\n \"dotm\",\n \"docm\",\n \"docx\",\n \"htm\",\n \"html\",\n \"jpeg\",\n \"json\",\n \"md\",\n \"pdf\",\n \"png\",\n \"potx\",\n \"ppsx\",\n \"pptm\",\n \"potm\",\n \"ppsm\",\n \"pptx\",\n \"tiff\",\n \"txt\",\n \"xls\",\n \"xlsx\",\n \"xhtml\",\n \"xml\",\n \"webp\",\n *TEXT_FILE_TYPES,\n ]\n\n # Fixed export settings\n EXPORT_FORMAT = \"Markdown\"\n IMAGE_MODE = \"placeholder\"\n\n _base_inputs = deepcopy(BaseFileComponent._base_inputs)\n\n for input_item in _base_inputs:\n if isinstance(input_item, FileInput) and input_item.name == \"path\":\n input_item.real_time_refresh = True\n break\n\n inputs = [\n *_base_inputs,\n BoolInput(\n name=\"advanced_mode\",\n display_name=\"Advanced Parser\",\n value=False,\n real_time_refresh=True,\n info=(\n \"Enable advanced document processing and export with Docling for PDFs, images, and office documents. \"\n \"Available only for single file processing.\"\n ),\n show=False,\n ),\n DropdownInput(\n name=\"pipeline\",\n display_name=\"Pipeline\",\n info=\"Docling pipeline to use\",\n options=[\"standard\", \"vlm\"],\n value=\"standard\",\n advanced=True,\n ),\n DropdownInput(\n name=\"ocr_engine\",\n display_name=\"OCR Engine\",\n info=\"OCR engine to use. Only available when pipeline is set to 'standard'.\",\n options=[\"\", \"easyocr\"],\n value=\"\",\n show=False,\n advanced=True,\n ),\n StrInput(\n name=\"md_image_placeholder\",\n display_name=\"Image placeholder\",\n info=\"Specify the image placeholder for markdown exports.\",\n value=\"\",\n advanced=True,\n show=False,\n ),\n StrInput(\n name=\"md_page_break_placeholder\",\n display_name=\"Page break placeholder\",\n info=\"Add this placeholder between pages in the markdown output.\",\n value=\"\",\n advanced=True,\n show=False,\n ),\n MessageTextInput(\n name=\"doc_key\",\n display_name=\"Doc Key\",\n info=\"The key to use for the DoclingDocument column.\",\n value=\"doc\",\n advanced=True,\n show=False,\n ),\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"[Deprecated] Use Multithreading\",\n advanced=True,\n value=True,\n info=\"Set 'Processing Concurrency' greater than 1 to enable multithreading.\",\n ),\n IntInput(\n name=\"concurrency_multithreading\",\n display_name=\"Processing Concurrency\",\n advanced=True,\n info=\"When multiple files are being processed, the number of files to process concurrently.\",\n value=1,\n ),\n BoolInput(\n name=\"markdown\",\n display_name=\"Markdown Export\",\n info=\"Export processed documents to Markdown format. Only available when advanced mode is enabled.\",\n value=False,\n show=False,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Raw Content\", name=\"message\", method=\"load_files_message\"),\n ]\n\n def _path_value(self, template) -> list[str]:\n # Get current path value\n return template.get(\"path\", {}).get(\"file_path\", [])\n\n def update_build_config(\n self,\n build_config: dict[str, Any],\n field_value: Any,\n field_name: str | None = None,\n ) -> dict[str, Any]:\n \"\"\"Update build configuration to show/hide fields based on file count and advanced_mode.\"\"\"\n if field_name == \"path\":\n # Get current path value\n path_value = self._path_value(build_config)\n file_path = path_value[0] if len(path_value) > 0 else \"\"\n\n # Show/hide Advanced Parser based on file count (only for single files)\n file_count = len(field_value) if field_value else 0\n if file_count == 1 and not file_path.endswith((\".csv\", \".xlsx\", \".parquet\")):\n build_config[\"advanced_mode\"][\"show\"] = True\n else:\n build_config[\"advanced_mode\"][\"show\"] = False\n build_config[\"advanced_mode\"][\"value\"] = False # Reset to False when hidden\n\n # Hide all advanced fields when Advanced Parser is not available\n advanced_fields = [\n \"pipeline\",\n \"ocr_engine\",\n \"doc_key\",\n \"md_image_placeholder\",\n \"md_page_break_placeholder\",\n ]\n for field in advanced_fields:\n if field in build_config:\n build_config[field][\"show\"] = False\n\n elif field_name == \"advanced_mode\":\n # Show/hide advanced fields based on advanced_mode (only if single file)\n advanced_fields = [\n \"pipeline\",\n \"ocr_engine\",\n \"doc_key\",\n \"md_image_placeholder\",\n \"md_page_break_placeholder\",\n ]\n\n for field in advanced_fields:\n if field in build_config:\n build_config[field][\"show\"] = field_value\n\n return build_config\n\n def update_outputs(self, frontend_node: dict[str, Any], field_name: str, field_value: Any) -> dict[str, Any]: # noqa: ARG002\n \"\"\"Dynamically show outputs based on the number of files and their types.\"\"\"\n if field_name not in [\"path\", \"advanced_mode\"]:\n return frontend_node\n\n # Add outputs based on the number of files in the path\n template = frontend_node.get(\"template\", {})\n path_value = self._path_value(template)\n if len(path_value) == 0:\n return frontend_node\n\n # Clear existing outputs\n frontend_node[\"outputs\"] = []\n\n if len(path_value) == 1:\n # We need to check if the file is structured content\n file_path = path_value[0] if field_name == \"path\" else frontend_node[\"template\"][\"path\"][\"file_path\"][0]\n if file_path.endswith((\".csv\", \".xlsx\", \".parquet\")):\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Structured Content\", name=\"dataframe\", method=\"load_files_structured\"),\n )\n elif file_path.endswith(\".json\"):\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Structured Content\", name=\"json\", method=\"load_files_json\"),\n )\n\n # Add outputs based on advanced mode\n advanced_mode = frontend_node.get(\"template\", {}).get(\"advanced_mode\", {}).get(\"value\", False)\n\n if advanced_mode:\n # Advanced mode: Structured Output, Markdown, and File Path\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Structured Output\", name=\"advanced\", method=\"load_files_advanced\"),\n )\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Markdown\", name=\"markdown\", method=\"load_files_markdown\"),\n )\n frontend_node[\"outputs\"].append(\n Output(display_name=\"File Path\", name=\"path\", method=\"load_files_path\"),\n )\n else:\n # Normal mode: Raw Content and File Path\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Raw Content\", name=\"message\", method=\"load_files_message\"),\n )\n frontend_node[\"outputs\"].append(\n Output(display_name=\"File Path\", name=\"path\", method=\"load_files_path\"),\n )\n else:\n # For multiple files, we show the files output (DataFrame format)\n # Advanced Parser is not available for multiple files\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Files\", name=\"dataframe\", method=\"load_files\"),\n )\n\n return frontend_node\n\n def _try_import_docling(self) -> DoclingImports | None:\n \"\"\"Try different import strategies for docling components.\"\"\"\n # Try strategy 1: Latest docling structure\n try:\n from docling.datamodel.base_models import ConversionStatus, InputFormat # type: ignore[import-untyped]\n from docling.document_converter import DocumentConverter # type: ignore[import-untyped]\n from docling_core.types.doc import ImageRefMode # type: ignore[import-untyped]\n\n self.log(\"Using latest docling import structure\")\n return DoclingImports(\n conversion_status=ConversionStatus,\n input_format=InputFormat,\n document_converter=DocumentConverter,\n image_ref_mode=ImageRefMode,\n strategy=\"latest\",\n )\n except ImportError as e:\n self.log(f\"Latest docling structure failed: {e}\")\n\n # Try strategy 2: Alternative import paths\n try:\n from docling.document_converter import DocumentConverter # type: ignore[import-untyped]\n from docling_core.types.doc import ImageRefMode # type: ignore[import-untyped]\n\n # Try to get ConversionStatus from different locations\n conversion_status: type[Enum] = MockConversionStatus\n input_format: type[Enum] = MockInputFormat\n\n try:\n from docling_core.types import ConversionStatus, InputFormat # type: ignore[import-untyped]\n\n conversion_status = ConversionStatus\n input_format = InputFormat\n except ImportError:\n try:\n from docling.datamodel import ConversionStatus, InputFormat # type: ignore[import-untyped]\n\n conversion_status = ConversionStatus\n input_format = InputFormat\n except ImportError:\n # Use mock enums if we can't find them\n pass\n\n self.log(\"Using alternative docling import structure\")\n return DoclingImports(\n conversion_status=conversion_status,\n input_format=input_format,\n document_converter=DocumentConverter,\n image_ref_mode=ImageRefMode,\n strategy=\"alternative\",\n )\n except ImportError as e:\n self.log(f\"Alternative docling structure failed: {e}\")\n\n # Try strategy 3: Basic converter only\n try:\n from docling.document_converter import DocumentConverter # type: ignore[import-untyped]\n\n self.log(\"Using basic docling import structure with mocks\")\n return DoclingImports(\n conversion_status=MockConversionStatus,\n input_format=MockInputFormat,\n document_converter=DocumentConverter,\n image_ref_mode=MockImageRefMode,\n strategy=\"basic\",\n )\n except ImportError as e:\n self.log(f\"Basic docling structure failed: {e}\")\n\n # Strategy 4: Complete fallback - return None to indicate failure\n return None\n\n def _create_advanced_converter(self, docling_imports: DoclingImports) -> Any:\n \"\"\"Create advanced converter with pipeline options if available.\"\"\"\n try:\n from docling.datamodel.pipeline_options import PdfPipelineOptions # type: ignore[import-untyped]\n from docling.document_converter import PdfFormatOption # type: ignore[import-untyped]\n\n document_converter = docling_imports.document_converter\n input_format = docling_imports.input_format\n\n # Create basic pipeline options\n pipeline_options = PdfPipelineOptions()\n\n # Configure OCR if specified and available\n if self.ocr_engine:\n try:\n from docling.models.factories import get_ocr_factory # type: ignore[import-untyped]\n\n pipeline_options.do_ocr = True\n ocr_factory = get_ocr_factory(allow_external_plugins=False)\n ocr_options = ocr_factory.create_options(kind=self.ocr_engine)\n pipeline_options.ocr_options = ocr_options\n self.log(f\"Configured OCR with engine: {self.ocr_engine}\")\n except Exception as e: # noqa: BLE001\n self.log(f\"Could not configure OCR: {e}, proceeding without OCR\")\n pipeline_options.do_ocr = False\n\n # Create format options\n pdf_format_option = PdfFormatOption(pipeline_options=pipeline_options)\n format_options = {}\n if hasattr(input_format, \"PDF\"):\n format_options[input_format.PDF] = pdf_format_option\n if hasattr(input_format, \"IMAGE\"):\n format_options[input_format.IMAGE] = pdf_format_option\n\n return document_converter(format_options=format_options)\n\n except Exception as e: # noqa: BLE001\n self.log(f\"Could not create advanced converter: {e}, using basic converter\")\n return docling_imports.document_converter()\n\n def _is_docling_compatible(self, file_path: str) -> bool:\n \"\"\"Check if file is compatible with Docling processing.\"\"\"\n # All VALID_EXTENSIONS are Docling compatible (except for TEXT_FILE_TYPES which may overlap)\n docling_extensions = [\n \".adoc\",\n \".asciidoc\",\n \".asc\",\n \".bmp\",\n \".csv\",\n \".dotx\",\n \".dotm\",\n \".docm\",\n \".docx\",\n \".htm\",\n \".html\",\n \".jpeg\",\n \".json\",\n \".md\",\n \".pdf\",\n \".png\",\n \".potx\",\n \".ppsx\",\n \".pptm\",\n \".potm\",\n \".ppsm\",\n \".pptx\",\n \".tiff\",\n \".txt\",\n \".xls\",\n \".xlsx\",\n \".xhtml\",\n \".xml\",\n \".webp\",\n ]\n return any(file_path.lower().endswith(ext) for ext in docling_extensions)\n\n def process_files(\n self,\n file_list: list[BaseFileComponent.BaseFile],\n ) -> list[BaseFileComponent.BaseFile]:\n \"\"\"Process files using standard parsing or Docling based on advanced_mode and file type.\"\"\"\n\n def process_file_standard(file_path: str, *, silent_errors: bool = False) -> Data | None:\n \"\"\"Process a single file using standard text parsing.\"\"\"\n try:\n return parse_text_file_to_data(file_path, silent_errors=silent_errors)\n except FileNotFoundError as e:\n msg = f\"File not found: {file_path}. Error: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n except Exception as e:\n msg = f\"Unexpected error processing {file_path}: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n\n def process_file_docling(file_path: str, *, silent_errors: bool = False) -> Data | None:\n \"\"\"Process a single file using Docling if compatible, otherwise standard processing.\"\"\"\n # Try Docling first if file is compatible and advanced mode is enabled\n try:\n return self._process_with_docling_and_export(file_path)\n except Exception as e: # noqa: BLE001\n self.log(f\"Docling processing failed for {file_path}: {e}, falling back to standard processing\")\n if not silent_errors:\n # Return error data instead of raising\n return Data(data={\"error\": f\"Docling processing failed: {e}\", \"file_path\": file_path})\n\n return None\n\n if not file_list:\n msg = \"No files to process.\"\n raise ValueError(msg)\n\n file_path = str(file_list[0].path)\n if self.advanced_mode and self._is_docling_compatible(file_path):\n processed_data = process_file_docling(file_path)\n if not processed_data:\n msg = f\"Failed to process file with Docling: {file_path}\"\n raise ValueError(msg)\n\n # Serialize processed data to match Data structure\n serialized_data = processed_data.serialize_model()\n\n # Now, if doc is nested, we need to unravel it\n clean_data: list[Data | None] = [processed_data]\n\n # This is where we've manually processed the data\n try:\n if \"exported_content\" not in serialized_data:\n clean_data = [\n Data(\n data={\n \"file_path\": file_path,\n **(\n item[\"element\"]\n if \"element\" in item\n else {k: v for k, v in item.items() if k != \"file_path\"}\n ),\n }\n )\n for item in serialized_data[\"doc\"]\n ]\n except Exception as _: # noqa: BLE001\n raise ValueError(serialized_data) from None\n\n # Repeat file_list to match the number of processed data elements\n final_data: list[Data | None] = clean_data\n return self.rollup_data(file_list, final_data)\n\n concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading)\n file_count = len(file_list)\n\n self.log(f\"Starting parallel processing of {file_count} files with concurrency: {concurrency}.\")\n file_paths = [str(file.path) for file in file_list]\n my_data = parallel_load_data(\n file_paths,\n silent_errors=self.silent_errors,\n load_function=process_file_standard,\n max_concurrency=concurrency,\n )\n\n return self.rollup_data(file_list, my_data)\n\n def load_files_advanced(self) -> DataFrame:\n \"\"\"Load files using advanced Docling processing and export to an advanced format.\"\"\"\n # TODO: Update\n self.markdown = False\n return self.load_files()\n\n def load_files_markdown(self) -> Message:\n \"\"\"Load files using advanced Docling processing and export to Markdown format.\"\"\"\n self.markdown = True\n result = self.load_files()\n return Message(text=str(result.text[0]))\n\n def _process_with_docling_and_export(self, file_path: str) -> Data:\n \"\"\"Process a single file with Docling and export to the specified format.\"\"\"\n # Import docling components only when needed\n docling_imports = self._try_import_docling()\n\n if docling_imports is None:\n msg = \"Docling not available for advanced processing\"\n raise ImportError(msg)\n\n conversion_status = docling_imports.conversion_status\n document_converter = docling_imports.document_converter\n image_ref_mode = docling_imports.image_ref_mode\n\n try:\n # Create converter based on strategy and pipeline setting\n if docling_imports.strategy == \"latest\" and self.pipeline == \"standard\":\n converter = self._create_advanced_converter(docling_imports)\n else:\n # Use basic converter for compatibility\n converter = document_converter()\n self.log(\"Using basic DocumentConverter for Docling processing\")\n\n # Process single file\n result = converter.convert(file_path)\n\n # Check if conversion was successful\n success = False\n if hasattr(result, \"status\"):\n if hasattr(conversion_status, \"SUCCESS\"):\n success = result.status == conversion_status.SUCCESS\n else:\n success = str(result.status).lower() == \"success\"\n elif hasattr(result, \"document\"):\n # If no status but has document, assume success\n success = result.document is not None\n\n if not success:\n return Data(data={\"error\": \"Docling conversion failed\", \"file_path\": file_path})\n\n if self.markdown:\n self.log(\"Exporting document to Markdown format\")\n # Export the document to the specified format\n exported_content = self._export_document(result.document, image_ref_mode)\n\n return Data(\n text=exported_content,\n data={\n \"exported_content\": exported_content,\n \"export_format\": self.EXPORT_FORMAT,\n \"file_path\": file_path,\n },\n )\n\n return Data(\n data={\n \"doc\": self.docling_to_dataframe_simple(result.document.export_to_dict()),\n \"export_format\": self.EXPORT_FORMAT,\n \"file_path\": file_path,\n }\n )\n\n except Exception as e: # noqa: BLE001\n return Data(data={\"error\": f\"Docling processing error: {e!s}\", \"file_path\": file_path})\n\n def docling_to_dataframe_simple(self, doc):\n \"\"\"Extract all text elements into a simple DataFrame.\"\"\"\n return [\n {\n \"page_no\": text[\"prov\"][0][\"page_no\"] if text[\"prov\"] else None,\n \"label\": text[\"label\"],\n \"text\": text[\"text\"],\n \"level\": text.get(\"level\", None), # for headers\n }\n for text in doc[\"texts\"]\n ]\n\n def _export_document(self, document: Any, image_ref_mode: type[Enum]) -> str:\n \"\"\"Export document to Markdown format with placeholder images.\"\"\"\n try:\n image_mode = (\n image_ref_mode(self.IMAGE_MODE) if hasattr(image_ref_mode, self.IMAGE_MODE) else self.IMAGE_MODE\n )\n\n # Always export to Markdown since it's fixed\n return document.export_to_markdown(\n image_mode=image_mode,\n image_placeholder=self.md_image_placeholder,\n page_break_placeholder=self.md_page_break_placeholder,\n )\n\n except Exception as e: # noqa: BLE001\n self.log(f\"Markdown export failed: {e}, using basic text export\")\n # Fallback to basic text export\n try:\n return document.export_to_text()\n except Exception: # noqa: BLE001\n return str(document)\n" + "value": "\"\"\"Enhanced file component with clearer structure and Docling isolation.\n\nNotes:\n-----\n- Functionality is preserved with minimal behavioral changes.\n- ALL Docling parsing/export runs in a separate OS process to prevent memory\n growth and native library state from impacting the main Langflow process.\n- Standard text/structured parsing continues to use existing BaseFileComponent\n utilities (and optional threading via `parallel_load_data`).\n\"\"\"\n\nfrom __future__ import annotations\n\nimport json\nimport subprocess\nimport sys\nimport textwrap\nfrom copy import deepcopy\nfrom typing import TYPE_CHECKING, Any\n\nfrom langflow.base.data.base_file import BaseFileComponent\nfrom langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data\nfrom langflow.io import (\n BoolInput,\n DropdownInput,\n FileInput,\n IntInput,\n MessageTextInput,\n Output,\n StrInput,\n)\nfrom langflow.schema.data import Data\nfrom langflow.schema.message import Message\n\nif TYPE_CHECKING:\n from langflow.schema import DataFrame\n\n\nclass FileComponent(BaseFileComponent):\n \"\"\"File component with optional Docling processing (isolated in a subprocess).\"\"\"\n\n display_name = \"File\"\n description = \"Loads content from files with optional advanced document processing and export using Docling.\"\n documentation: str = \"https://docs.langflow.org/components-data#file\"\n icon = \"file-text\"\n name = \"File\"\n\n # Docling-supported/compatible extensions; TEXT_FILE_TYPES are supported by the base loader.\n VALID_EXTENSIONS = [\n \"adoc\",\n \"asciidoc\",\n \"asc\",\n \"bmp\",\n \"csv\",\n \"dotx\",\n \"dotm\",\n \"docm\",\n \"docx\",\n \"htm\",\n \"html\",\n \"jpeg\",\n \"json\",\n \"md\",\n \"pdf\",\n \"png\",\n \"potx\",\n \"ppsx\",\n \"pptm\",\n \"potm\",\n \"ppsm\",\n \"pptx\",\n \"tiff\",\n \"txt\",\n \"xls\",\n \"xlsx\",\n \"xhtml\",\n \"xml\",\n \"webp\",\n *TEXT_FILE_TYPES,\n ]\n\n # Fixed export settings used when markdown export is requested.\n EXPORT_FORMAT = \"Markdown\"\n IMAGE_MODE = \"placeholder\"\n\n # ---- Inputs / Outputs (kept as close to original as possible) -------------------\n _base_inputs = deepcopy(BaseFileComponent._base_inputs)\n for input_item in _base_inputs:\n if isinstance(input_item, FileInput) and input_item.name == \"path\":\n input_item.real_time_refresh = True\n break\n\n inputs = [\n *_base_inputs,\n BoolInput(\n name=\"advanced_mode\",\n display_name=\"Advanced Parser\",\n value=False,\n real_time_refresh=True,\n info=(\n \"Enable advanced document processing and export with Docling for PDFs, images, and office documents. \"\n \"Available only for single file processing.\"\n ),\n show=False,\n ),\n DropdownInput(\n name=\"pipeline\",\n display_name=\"Pipeline\",\n info=\"Docling pipeline to use\",\n options=[\"standard\", \"vlm\"],\n value=\"standard\",\n advanced=True,\n ),\n DropdownInput(\n name=\"ocr_engine\",\n display_name=\"OCR Engine\",\n info=\"OCR engine to use. Only available when pipeline is set to 'standard'.\",\n options=[\"\", \"easyocr\"],\n value=\"\",\n show=False,\n advanced=True,\n ),\n StrInput(\n name=\"md_image_placeholder\",\n display_name=\"Image placeholder\",\n info=\"Specify the image placeholder for markdown exports.\",\n value=\"\",\n advanced=True,\n show=False,\n ),\n StrInput(\n name=\"md_page_break_placeholder\",\n display_name=\"Page break placeholder\",\n info=\"Add this placeholder between pages in the markdown output.\",\n value=\"\",\n advanced=True,\n show=False,\n ),\n MessageTextInput(\n name=\"doc_key\",\n display_name=\"Doc Key\",\n info=\"The key to use for the DoclingDocument column.\",\n value=\"doc\",\n advanced=True,\n show=False,\n ),\n # Deprecated input retained for backward-compatibility.\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"[Deprecated] Use Multithreading\",\n advanced=True,\n value=True,\n info=\"Set 'Processing Concurrency' greater than 1 to enable multithreading.\",\n ),\n IntInput(\n name=\"concurrency_multithreading\",\n display_name=\"Processing Concurrency\",\n advanced=True,\n info=\"When multiple files are being processed, the number of files to process concurrently.\",\n value=1,\n ),\n BoolInput(\n name=\"markdown\",\n display_name=\"Markdown Export\",\n info=\"Export processed documents to Markdown format. Only available when advanced mode is enabled.\",\n value=False,\n show=False,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Raw Content\", name=\"message\", method=\"load_files_message\"),\n ]\n\n # ------------------------------ UI helpers --------------------------------------\n\n def _path_value(self, template: dict) -> list[str]:\n \"\"\"Return the list of currently selected file paths from the template.\"\"\"\n return template.get(\"path\", {}).get(\"file_path\", [])\n\n def update_build_config(\n self,\n build_config: dict[str, Any],\n field_value: Any,\n field_name: str | None = None,\n ) -> dict[str, Any]:\n \"\"\"Show/hide Advanced Parser and related fields based on selection context.\"\"\"\n if field_name == \"path\":\n paths = self._path_value(build_config)\n file_path = paths[0] if paths else \"\"\n file_count = len(field_value) if field_value else 0\n\n # Advanced mode only for single (non-tabular) file\n allow_advanced = file_count == 1 and not file_path.endswith((\".csv\", \".xlsx\", \".parquet\"))\n build_config[\"advanced_mode\"][\"show\"] = allow_advanced\n if not allow_advanced:\n build_config[\"advanced_mode\"][\"value\"] = False\n for f in (\"pipeline\", \"ocr_engine\", \"doc_key\", \"md_image_placeholder\", \"md_page_break_placeholder\"):\n if f in build_config:\n build_config[f][\"show\"] = False\n\n elif field_name == \"advanced_mode\":\n for f in (\"pipeline\", \"ocr_engine\", \"doc_key\", \"md_image_placeholder\", \"md_page_break_placeholder\"):\n if f in build_config:\n build_config[f][\"show\"] = bool(field_value)\n\n return build_config\n\n def update_outputs(self, frontend_node: dict[str, Any], field_name: str, field_value: Any) -> dict[str, Any]: # noqa: ARG002\n \"\"\"Dynamically show outputs based on file count/type and advanced mode.\"\"\"\n if field_name not in [\"path\", \"advanced_mode\"]:\n return frontend_node\n\n template = frontend_node.get(\"template\", {})\n paths = self._path_value(template)\n if not paths:\n return frontend_node\n\n frontend_node[\"outputs\"] = []\n if len(paths) == 1:\n file_path = paths[0] if field_name == \"path\" else frontend_node[\"template\"][\"path\"][\"file_path\"][0]\n if file_path.endswith((\".csv\", \".xlsx\", \".parquet\")):\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Structured Content\", name=\"dataframe\", method=\"load_files_structured\"),\n )\n elif file_path.endswith(\".json\"):\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Structured Content\", name=\"json\", method=\"load_files_json\"),\n )\n\n advanced_mode = frontend_node.get(\"template\", {}).get(\"advanced_mode\", {}).get(\"value\", False)\n if advanced_mode:\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Structured Output\", name=\"advanced\", method=\"load_files_advanced\"),\n )\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Markdown\", name=\"markdown\", method=\"load_files_markdown\"),\n )\n frontend_node[\"outputs\"].append(\n Output(display_name=\"File Path\", name=\"path\", method=\"load_files_path\"),\n )\n else:\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Raw Content\", name=\"message\", method=\"load_files_message\"),\n )\n frontend_node[\"outputs\"].append(\n Output(display_name=\"File Path\", name=\"path\", method=\"load_files_path\"),\n )\n else:\n # Multiple files => DataFrame output; advanced parser disabled\n frontend_node[\"outputs\"].append(Output(display_name=\"Files\", name=\"dataframe\", method=\"load_files\"))\n\n return frontend_node\n\n # ------------------------------ Core processing ----------------------------------\n\n def _is_docling_compatible(self, file_path: str) -> bool:\n \"\"\"Lightweight extension gate for Docling-compatible types.\"\"\"\n docling_exts = (\n \".adoc\",\n \".asciidoc\",\n \".asc\",\n \".bmp\",\n \".csv\",\n \".dotx\",\n \".dotm\",\n \".docm\",\n \".docx\",\n \".htm\",\n \".html\",\n \".jpeg\",\n \".json\",\n \".md\",\n \".pdf\",\n \".png\",\n \".potx\",\n \".ppsx\",\n \".pptm\",\n \".potm\",\n \".ppsm\",\n \".pptx\",\n \".tiff\",\n \".txt\",\n \".xls\",\n \".xlsx\",\n \".xhtml\",\n \".xml\",\n \".webp\",\n )\n return file_path.lower().endswith(docling_exts)\n\n def _process_docling_in_subprocess(self, file_path: str) -> Data | None:\n \"\"\"Run Docling in a separate OS process and map the result to a Data object.\n\n We avoid multiprocessing pickling by launching `python -c \"