Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ LANGFLOW_SAVE_DB_IN_CONFIG_DIR=
# SQLite example:
LANGFLOW_DATABASE_URL=sqlite:///./langflow.db

# mem0 creates a directory
# for chat history, vector stores, and other artifacts
# its default path is ~/.mem0.
# we can change this path with
# environment variable "MEM0_DIR"
# Example: MEM0_DIR=/tmp/.mem0

# composio creates a cache directory
# for file uploads/downloads.
# its default path is ~/.composio
# we can change this path with
# environment variable "COMPOSIO_CACHE_DIR"
# Example: COMPOSIO_CACHE_DIR=/tmp/.composio

# Database connection retry
# Values: true, false
# If true, the database will retry to connect to the database if it fails
Expand Down
30 changes: 30 additions & 0 deletions src/backend/base/langflow/base/composio/composio_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
from langflow.schema.data import Data
from langflow.schema.dataframe import DataFrame
from langflow.schema.message import Message
from langflow.utils.validate import check_s3_storage_mode

disabled_in_cloud_msg = (
"Composio is not supported in S3/cloud mode. "
"Please use local storage mode."
)

def _patch_graph_clean_null_input_types() -> None:
"""Monkey-patch Graph._create_vertex to clean legacy templates."""
Expand Down Expand Up @@ -59,6 +64,7 @@ def _create_vertex_with_cleanup(self, frontend_data):
_patch_graph_clean_null_input_types()



class ComposioBaseComponent(Component):
"""Base class for Composio components with common functionality."""

Expand Down Expand Up @@ -114,6 +120,7 @@ class ComposioBaseComponent(Component):

def __init__(self, **kwargs):
"""Initialize instance variables to prevent shared state between components."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
super().__init__(**kwargs)
self._all_fields: set[str] = set()
self._bool_variables: set[str] = set()
Expand All @@ -125,12 +132,14 @@ def __init__(self, **kwargs):
self._action_schemas: dict[str, Any] = {}

def as_message(self) -> Message:
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
result = self.execute_action()
if result is None:
return Message(text="Action execution returned no result")
return Message(text=str(result))

def as_dataframe(self) -> DataFrame:
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
result = self.execute_action()

if isinstance(result, dict):
Expand All @@ -146,11 +155,13 @@ def as_dataframe(self) -> DataFrame:
return result_dataframe

def as_data(self) -> Data:
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
result = self.execute_action()
return Data(results=result)

def _build_action_maps(self):
"""Build lookup maps for action names."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
if not self._display_to_key_map or not self._key_to_display_map:
self._display_to_key_map = {data["display_name"]: key for key, data in self._actions_data.items()}
self._key_to_display_map = {key: data["display_name"] for key, data in self._actions_data.items()}
Expand All @@ -161,22 +172,26 @@ def _build_action_maps(self):

def sanitize_action_name(self, action_name: str) -> str:
"""Convert action name to display name using lookup."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
self._build_action_maps()
return self._key_to_display_map.get(action_name, action_name)

def desanitize_action_name(self, action_name: str) -> str:
"""Convert display name to action key using lookup."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
self._build_action_maps()
return self._display_to_key_map.get(action_name, action_name)

def _get_action_fields(self, action_key: str | None) -> set[str]:
"""Get fields for an action."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
if action_key is None:
return set()
return set(self._actions_data[action_key]["action_fields"]) if action_key in self._actions_data else set()

def _build_wrapper(self) -> Composio:
"""Build the Composio wrapper."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
try:
if not self.api_key:
msg = "Composio API Key is required"
Expand Down Expand Up @@ -219,6 +234,7 @@ def show_hide_fields(self, build_config: dict, field_value: Any):

def _populate_actions_data(self):
"""Fetch the list of actions for the toolkit and build helper maps."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
if self._actions_data:
return

Expand Down Expand Up @@ -440,6 +456,7 @@ def _populate_actions_data(self):

def _validate_schema_inputs(self, action_key: str) -> list[InputTypes]:
"""Convert the JSON schema for *action_key* into Langflow input objects."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
# Skip validation for default/placeholder values
if action_key in ("disabled", "placeholder", ""):
logger.debug(f"Skipping schema validation for placeholder value: {action_key}")
Expand Down Expand Up @@ -669,6 +686,7 @@ def _validate_schema_inputs(self, action_key: str) -> list[InputTypes]:

def _get_inputs_for_all_actions(self) -> dict[str, list[InputTypes]]:
"""Return a mapping action_key → list[InputTypes] for every action."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
result: dict[str, list[InputTypes]] = {}
for key in self._actions_data:
result[key] = self._validate_schema_inputs(key)
Expand All @@ -687,6 +705,7 @@ def _remove_inputs_from_build_config(self, build_config: dict, keep_for_action:

def _update_action_config(self, build_config: dict, selected_value: Any) -> None:
"""Add or update parameter input fields for the chosen action."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
if not selected_value:
return

Expand Down Expand Up @@ -733,6 +752,7 @@ def _is_tool_mode_enabled(self) -> bool:

def _set_action_visibility(self, build_config: dict, *, force_show: bool | None = None) -> None:
"""Set action field visibility based on tool_mode state or forced value."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
if force_show is not None:
build_config["action_button"]["show"] = force_show
else:
Expand All @@ -741,12 +761,14 @@ def _set_action_visibility(self, build_config: dict, *, force_show: bool | None

def create_new_auth_config(self, app_name: str) -> str:
"""Create a new auth config for the given app name."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
composio = self._build_wrapper()
auth_config = composio.auth_configs.create(toolkit=app_name, options={"type": "use_composio_managed_auth"})
return auth_config.id

def _initiate_connection(self, app_name: str) -> tuple[str, str]:
"""Initiate OAuth connection and return (redirect_url, connection_id)."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
try:
composio = self._build_wrapper()

Expand Down Expand Up @@ -786,6 +808,7 @@ def _initiate_connection(self, app_name: str) -> tuple[str, str]:

def _check_connection_status_by_id(self, connection_id: str) -> str | None:
"""Check status of a specific connection by ID. Returns status or None if not found."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
try:
composio = self._build_wrapper()
connection = composio.connected_accounts.get(nanoid=connection_id)
Expand All @@ -799,6 +822,7 @@ def _check_connection_status_by_id(self, connection_id: str) -> str | None:

def _find_active_connection_for_app(self, app_name: str) -> tuple[str, str] | None:
"""Find any ACTIVE connection for this app/user. Returns (connection_id, status) or None."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
try:
composio = self._build_wrapper()
connection_list = composio.connected_accounts.list(
Expand All @@ -821,6 +845,7 @@ def _find_active_connection_for_app(self, app_name: str) -> tuple[str, str] | No

def _disconnect_specific_connection(self, connection_id: str) -> None:
"""Disconnect a specific Composio connection by ID."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
try:
composio = self._build_wrapper()
composio.connected_accounts.delete(nanoid=connection_id)
Expand All @@ -833,6 +858,7 @@ def _disconnect_specific_connection(self, connection_id: str) -> None:

def update_build_config(self, build_config: dict, field_value: Any, field_name: str | None = None) -> dict:
"""Update build config for auth and action selection."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
# Clean any legacy None values that may still be present
for _fconfig in build_config.values():
if isinstance(_fconfig, dict) and _fconfig.get("input_types") is None:
Expand Down Expand Up @@ -1187,6 +1213,7 @@ def enabled_tools(self):
to prevent overwhelming the agent. Subclasses can override this behavior.

"""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
if not self._actions_data:
self._populate_actions_data()

Expand All @@ -1199,6 +1226,7 @@ def enabled_tools(self):

def execute_action(self):
"""Execute the selected Composio tool."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
composio = self._build_wrapper()
self._populate_actions_data()
self._build_action_maps()
Expand Down Expand Up @@ -1282,6 +1310,7 @@ def execute_action(self):

def _apply_post_processor(self, action_key: str, raw_data: Any) -> Any:
"""Apply post-processor for the given action if defined."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
if hasattr(self, "post_processors") and isinstance(self.post_processors, dict):
processor_func = self.post_processors.get(action_key)
if processor_func and callable(processor_func):
Expand All @@ -1295,3 +1324,4 @@ def _apply_post_processor(self, action_key: str, raw_data: Any) -> Any:

def set_default_tools(self):
"""Set the default tools."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
11 changes: 10 additions & 1 deletion src/backend/base/langflow/components/composio/composio_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@
SortableListInput,
)
from langflow.io import Output
from langflow.utils.validate import check_s3_storage_mode

disabled_in_cloud_msg = (
"Composio is not supported in S3/cloud mode. "
"Please use local storage mode."
)

# TODO: We get the list from the API but we need to filter it
enabled_tools = ["confluence", "discord", "dropbox", "github", "gmail", "linkedin", "notion", "slack", "youtube"]


class ComposioAPIComponent(LCToolComponent):
display_name: str = "Composio Tools"
description: str = "Use Composio toolset to run actions with your agent"
Expand Down Expand Up @@ -71,6 +76,7 @@ class ComposioAPIComponent(LCToolComponent):

def validate_tool(self, build_config: dict, field_value: Any, tool_name: str | None = None) -> dict:
# Get the index of the selected tool in the list of options
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
selected_tool_index = next(
(
ind
Expand Down Expand Up @@ -120,6 +126,7 @@ def validate_tool(self, build_config: dict, field_value: Any, tool_name: str | N
return build_config

def update_build_config(self, build_config: dict, field_value: Any, field_name: str | None = None) -> dict:
check_s3_storage_mode(disabled_in_cloud_msg) # check if we're in S3 storage mode, otherwise raise an error
if field_name == "api_key" or (self.api_key and not build_config["tool_name"]["options"]):
if field_name == "api_key" and not field_value:
build_config["tool_name"]["options"] = []
Expand Down Expand Up @@ -229,6 +236,7 @@ def build_tool(self) -> Sequence[Tool]:
Returns:
Sequence[Tool]: List of configured Composio tools.
"""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
composio = self._build_wrapper()
action_names = [action["name"] for action in self.actions]

Expand Down Expand Up @@ -257,6 +265,7 @@ def _build_wrapper(self) -> Composio:
Raises:
ValueError: If the API key is not found or invalid.
"""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
try:
if not self.api_key:
msg = "Composio API Key is required"
Expand Down
9 changes: 9 additions & 0 deletions src/backend/base/langflow/components/mem0/mem0_chat_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@
from langflow.io import Output
from langflow.logging.logger import logger
from langflow.schema.data import Data
from langflow.utils.validate import check_s3_storage_mode

disabled_in_cloud_msg = (
"Mem0 memory is not supported in S3/cloud mode. "
"Mem0 memory requires local file system access for persistence. "
"Please use local storage mode."
)

class Mem0MemoryComponent(LCChatMemoryComponent):
display_name = "Mem0 Chat Memory"
Expand Down Expand Up @@ -80,6 +86,7 @@ class Mem0MemoryComponent(LCChatMemoryComponent):

def build_mem0(self) -> Memory:
"""Initializes a Mem0 memory instance based on provided configuration and API keys."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
if self.openai_api_key:
os.environ["OPENAI_API_KEY"] = self.openai_api_key

Expand All @@ -95,6 +102,7 @@ def build_mem0(self) -> Memory:

def ingest_data(self) -> Memory:
"""Ingests a new message into Mem0 memory and returns the updated memory instance."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
mem0_memory = self.existing_memory or self.build_mem0()

if not self.ingest_message or not self.user_id:
Expand All @@ -115,6 +123,7 @@ def ingest_data(self) -> Memory:

def build_search_results(self) -> Data:
"""Searches the Mem0 memory for related messages based on the search query and returns the results."""
check_s3_storage_mode(disabled_in_cloud_msg) # ensure we're not in S3 storage mode, otherwise raise an error
mem0_memory = self.ingest_data()
search_query = self.search_query
user_id = self.user_id
Expand Down
19 changes: 8 additions & 11 deletions src/backend/base/langflow/components/vectorstores/local_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@
from langflow.schema.data import Data
from langflow.schema.dataframe import DataFrame
from langflow.template.field.base import Output
from langflow.utils.validate import check_s3_storage_mode

disabled_in_cloud_msg = (
"Local DB is not supported in S3/cloud mode. "
"Local DB requires local file system access for persistence. "
"Please use local storage mode."
)

class LocalDBComponent(LCVectorStoreComponent):
"""Chroma Vector Store with search capabilities."""
Expand Down Expand Up @@ -194,17 +200,8 @@ def update_build_config(self, build_config: dict, field_value: str, field_name:
@check_cached_vector_store
def build_vector_store(self) -> Chroma:
"""Builds the Chroma object."""
# Check if we're in S3 mode - local vector stores not supported in cloud
from langflow.services.deps import get_settings_service

settings = get_settings_service().settings
if settings.storage_type == "s3":
msg = (
"Local vector stores are not supported in S3/cloud mode. "
"Local vector stores require local file system access for persistence. "
"Please use cloud-based vector stores (Pinecone, Weaviate, etc.) or local storage mode."
)
raise ValueError(msg)
# ensure we're not in S3 storage mode, otherwise raise an error
check_s3_storage_mode(disabled_in_cloud_msg)

try:
from langchain_chroma import Chroma
Expand Down
15 changes: 15 additions & 0 deletions src/backend/base/langflow/utils/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from langflow.field_typing.constants import CUSTOM_COMPONENT_SUPPORTED_TYPES, DEFAULT_IMPORT_STRING
from langflow.logging.logger import logger
from langflow.services.deps import get_settings_service


def add_type_ignores() -> None:
Expand Down Expand Up @@ -487,3 +488,17 @@ def extract_class_name(code: str) -> str:
except SyntaxError as e:
msg = f"Invalid Python code: {e!s}"
raise ValueError(msg) from e


def check_s3_storage_mode(msg: str):
"""Check if we're in S3 storage mode and raise an error if we are.

This method is used to certain disable components in the cloud.
We identify cloud use with: storage_type == "s3" in the settings service.

Args:
msg: The message to raise if we're in S3 storage mode
"""
settings = get_settings_service().settings
if settings.storage_type == "s3":
raise ValueError(msg)
Loading