diff --git a/src/backend/base/langflow/components/helpers/memory.py b/src/backend/base/langflow/components/helpers/memory.py index 3f3ceb6226b6..8ef9c516f169 100644 --- a/src/backend/base/langflow/components/helpers/memory.py +++ b/src/backend/base/langflow/components/helpers/memory.py @@ -19,10 +19,10 @@ class MemoryComponent(Component): documentation: str = "https://docs.langflow.org/components-helpers#message-history" icon = "message-square-more" name = "Memory" - default_keys = ["mode", "memory"] + default_keys = ["mode", "memory", "session_id"] mode_config = { "Store": ["message", "memory", "sender", "sender_name", "session_id"], - "Retrieve": ["n_messages", "order", "template", "memory"], + "Retrieve": ["n_messages", "order", "template", "memory", "session_id"], } inputs = [ diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Custom Component Generator.json b/src/backend/base/langflow/initial_setup/starter_projects/Custom Component Generator.json index 3b3a634d6e75..152d15870cbf 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Custom Component Generator.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Custom Component Generator.json @@ -287,7 +287,7 @@ "show": true, "title_case": false, "type": "code", - "value": "from typing import Any, cast\n\nfrom langflow.custom.custom_component.component import Component\nfrom langflow.helpers.data import data_to_text\nfrom langflow.inputs.inputs import DropdownInput, HandleInput, IntInput, MessageTextInput, MultilineInput, TabInput\nfrom langflow.memory import aget_messages, astore_message\nfrom langflow.schema.data import Data\nfrom langflow.schema.dataframe import DataFrame\nfrom langflow.schema.dotdict import dotdict\nfrom langflow.schema.message import Message\nfrom langflow.template.field.base import Output\nfrom langflow.utils.component_utils import set_current_fields, set_field_display\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass MemoryComponent(Component):\n display_name = \"Message History\"\n description = \"Stores or retrieves stored chat messages from Langflow tables or an external memory.\"\n documentation: str = \"https://docs.langflow.org/components-helpers#message-history\"\n icon = \"message-square-more\"\n name = \"Memory\"\n default_keys = [\"mode\", \"memory\"]\n mode_config = {\n \"Store\": [\"message\", \"memory\", \"sender\", \"sender_name\", \"session_id\"],\n \"Retrieve\": [\"n_messages\", \"order\", \"template\", \"memory\"],\n }\n\n inputs = [\n TabInput(\n name=\"mode\",\n display_name=\"Mode\",\n options=[\"Retrieve\", \"Store\"],\n value=\"Retrieve\",\n info=\"Operation mode: Store messages or Retrieve messages.\",\n real_time_refresh=True,\n ),\n MessageTextInput(\n name=\"message\",\n display_name=\"Message\",\n info=\"The chat message to be stored.\",\n tool_mode=True,\n dynamic=True,\n show=False,\n ),\n HandleInput(\n name=\"memory\",\n display_name=\"External Memory\",\n input_types=[\"Memory\"],\n info=\"Retrieve messages from an external memory. If empty, it will use the Langflow tables.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"sender_type\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER, \"Machine and User\"],\n value=\"Machine and User\",\n info=\"Filter by sender type.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender\",\n display_name=\"Sender\",\n info=\"The sender of the message. Might be Machine or User. \"\n \"If empty, the current sender parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Filter by sender name.\",\n advanced=True,\n show=False,\n ),\n IntInput(\n name=\"n_messages\",\n display_name=\"Number of Messages\",\n value=100,\n info=\"Number of messages to retrieve.\",\n advanced=True,\n show=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n value=\"\",\n advanced=True,\n ),\n DropdownInput(\n name=\"order\",\n display_name=\"Order\",\n options=[\"Ascending\", \"Descending\"],\n value=\"Ascending\",\n info=\"Order of the messages.\",\n advanced=True,\n tool_mode=True,\n required=True,\n ),\n MultilineInput(\n name=\"template\",\n display_name=\"Template\",\n info=\"The template to use for formatting the data. \"\n \"It can contain the keys {text}, {sender} or any other key in the message data.\",\n value=\"{sender_name}: {text}\",\n advanced=True,\n show=False,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Message\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True),\n Output(display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True),\n ]\n\n def update_outputs(self, frontend_node: dict, field_name: str, field_value: Any) -> dict:\n \"\"\"Dynamically show only the relevant output based on the selected output type.\"\"\"\n if field_name == \"mode\":\n # Start with empty outputs\n frontend_node[\"outputs\"] = []\n if field_value == \"Store\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Stored Messages\",\n name=\"stored_messages\",\n method=\"store_message\",\n hidden=True,\n dynamic=True,\n )\n ]\n if field_value == \"Retrieve\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Messages\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True\n ),\n Output(\n display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True\n ),\n ]\n return frontend_node\n\n async def store_message(self) -> Message:\n message = Message(text=self.message) if isinstance(self.message, str) else self.message\n\n message.session_id = self.session_id or message.session_id\n message.sender = self.sender or message.sender or MESSAGE_SENDER_AI\n message.sender_name = self.sender_name or message.sender_name or MESSAGE_SENDER_NAME_AI\n\n stored_messages: list[Message] = []\n\n if self.memory:\n self.memory.session_id = message.session_id\n lc_message = message.to_lc_message()\n await self.memory.aadd_messages([lc_message])\n\n stored_messages = await self.memory.aget_messages() or []\n\n stored_messages = [Message.from_lc_message(m) for m in stored_messages] if stored_messages else []\n\n if message.sender:\n stored_messages = [m for m in stored_messages if m.sender == message.sender]\n else:\n await astore_message(message, flow_id=self.graph.flow_id)\n stored_messages = (\n await aget_messages(\n session_id=message.session_id, sender_name=message.sender_name, sender=message.sender\n )\n or []\n )\n\n if not stored_messages:\n msg = \"No messages were stored. Please ensure that the session ID and sender are properly set.\"\n raise ValueError(msg)\n\n stored_message = stored_messages[0]\n self.status = stored_message\n return stored_message\n\n async def retrieve_messages(self) -> Data:\n sender_type = self.sender_type\n sender_name = self.sender_name\n session_id = self.session_id\n n_messages = self.n_messages\n order = \"DESC\" if self.order == \"Descending\" else \"ASC\"\n\n if sender_type == \"Machine and User\":\n sender_type = None\n\n if self.memory and not hasattr(self.memory, \"aget_messages\"):\n memory_name = type(self.memory).__name__\n err_msg = f\"External Memory object ({memory_name}) must have 'aget_messages' method.\"\n raise AttributeError(err_msg)\n # Check if n_messages is None or 0\n if n_messages == 0:\n stored = []\n elif self.memory:\n # override session_id\n self.memory.session_id = session_id\n\n stored = await self.memory.aget_messages()\n # langchain memories are supposed to return messages in ascending order\n\n if n_messages:\n stored = stored[-n_messages:] # Get last N messages first\n\n if order == \"DESC\":\n stored = stored[::-1] # Then reverse if needed\n\n stored = [Message.from_lc_message(m) for m in stored]\n if sender_type:\n expected_type = MESSAGE_SENDER_AI if sender_type == MESSAGE_SENDER_AI else MESSAGE_SENDER_USER\n stored = [m for m in stored if m.type == expected_type]\n else:\n # For internal memory, we always fetch the last N messages by ordering by DESC\n stored = await aget_messages(\n sender=sender_type,\n sender_name=sender_name,\n session_id=session_id,\n limit=10000,\n order=order,\n )\n if n_messages:\n stored = stored[-n_messages:] # Get last N messages\n\n # self.status = stored\n return cast(\"Data\", stored)\n\n async def retrieve_messages_as_text(self) -> Message:\n stored_text = data_to_text(self.template, await self.retrieve_messages())\n # self.status = stored_text\n return Message(text=stored_text)\n\n async def retrieve_messages_dataframe(self) -> DataFrame:\n \"\"\"Convert the retrieved messages into a DataFrame.\n\n Returns:\n DataFrame: A DataFrame containing the message data.\n \"\"\"\n messages = await self.retrieve_messages()\n return DataFrame(messages)\n\n def update_build_config(\n self,\n build_config: dotdict,\n field_value: Any, # noqa: ARG002\n field_name: str | None = None, # noqa: ARG002\n ) -> dotdict:\n return set_current_fields(\n build_config=build_config,\n action_fields=self.mode_config,\n selected_action=build_config[\"mode\"][\"value\"],\n default_fields=self.default_keys,\n func=set_field_display,\n )\n" + "value": "from typing import Any, cast\n\nfrom langflow.custom.custom_component.component import Component\nfrom langflow.helpers.data import data_to_text\nfrom langflow.inputs.inputs import DropdownInput, HandleInput, IntInput, MessageTextInput, MultilineInput, TabInput\nfrom langflow.memory import aget_messages, astore_message\nfrom langflow.schema.data import Data\nfrom langflow.schema.dataframe import DataFrame\nfrom langflow.schema.dotdict import dotdict\nfrom langflow.schema.message import Message\nfrom langflow.template.field.base import Output\nfrom langflow.utils.component_utils import set_current_fields, set_field_display\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass MemoryComponent(Component):\n display_name = \"Message History\"\n description = \"Stores or retrieves stored chat messages from Langflow tables or an external memory.\"\n documentation: str = \"https://docs.langflow.org/components-helpers#message-history\"\n icon = \"message-square-more\"\n name = \"Memory\"\n default_keys = [\"mode\", \"memory\", \"session_id\"]\n mode_config = {\n \"Store\": [\"message\", \"memory\", \"sender\", \"sender_name\", \"session_id\"],\n \"Retrieve\": [\"n_messages\", \"order\", \"template\", \"memory\", \"session_id\"],\n }\n\n inputs = [\n TabInput(\n name=\"mode\",\n display_name=\"Mode\",\n options=[\"Retrieve\", \"Store\"],\n value=\"Retrieve\",\n info=\"Operation mode: Store messages or Retrieve messages.\",\n real_time_refresh=True,\n ),\n MessageTextInput(\n name=\"message\",\n display_name=\"Message\",\n info=\"The chat message to be stored.\",\n tool_mode=True,\n dynamic=True,\n show=False,\n ),\n HandleInput(\n name=\"memory\",\n display_name=\"External Memory\",\n input_types=[\"Memory\"],\n info=\"Retrieve messages from an external memory. If empty, it will use the Langflow tables.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"sender_type\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER, \"Machine and User\"],\n value=\"Machine and User\",\n info=\"Filter by sender type.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender\",\n display_name=\"Sender\",\n info=\"The sender of the message. Might be Machine or User. \"\n \"If empty, the current sender parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Filter by sender name.\",\n advanced=True,\n show=False,\n ),\n IntInput(\n name=\"n_messages\",\n display_name=\"Number of Messages\",\n value=100,\n info=\"Number of messages to retrieve.\",\n advanced=True,\n show=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n value=\"\",\n advanced=True,\n ),\n DropdownInput(\n name=\"order\",\n display_name=\"Order\",\n options=[\"Ascending\", \"Descending\"],\n value=\"Ascending\",\n info=\"Order of the messages.\",\n advanced=True,\n tool_mode=True,\n required=True,\n ),\n MultilineInput(\n name=\"template\",\n display_name=\"Template\",\n info=\"The template to use for formatting the data. \"\n \"It can contain the keys {text}, {sender} or any other key in the message data.\",\n value=\"{sender_name}: {text}\",\n advanced=True,\n show=False,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Message\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True),\n Output(display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True),\n ]\n\n def update_outputs(self, frontend_node: dict, field_name: str, field_value: Any) -> dict:\n \"\"\"Dynamically show only the relevant output based on the selected output type.\"\"\"\n if field_name == \"mode\":\n # Start with empty outputs\n frontend_node[\"outputs\"] = []\n if field_value == \"Store\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Stored Messages\",\n name=\"stored_messages\",\n method=\"store_message\",\n hidden=True,\n dynamic=True,\n )\n ]\n if field_value == \"Retrieve\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Messages\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True\n ),\n Output(\n display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True\n ),\n ]\n return frontend_node\n\n async def store_message(self) -> Message:\n message = Message(text=self.message) if isinstance(self.message, str) else self.message\n\n message.session_id = self.session_id or message.session_id\n message.sender = self.sender or message.sender or MESSAGE_SENDER_AI\n message.sender_name = self.sender_name or message.sender_name or MESSAGE_SENDER_NAME_AI\n\n stored_messages: list[Message] = []\n\n if self.memory:\n self.memory.session_id = message.session_id\n lc_message = message.to_lc_message()\n await self.memory.aadd_messages([lc_message])\n\n stored_messages = await self.memory.aget_messages() or []\n\n stored_messages = [Message.from_lc_message(m) for m in stored_messages] if stored_messages else []\n\n if message.sender:\n stored_messages = [m for m in stored_messages if m.sender == message.sender]\n else:\n await astore_message(message, flow_id=self.graph.flow_id)\n stored_messages = (\n await aget_messages(\n session_id=message.session_id, sender_name=message.sender_name, sender=message.sender\n )\n or []\n )\n\n if not stored_messages:\n msg = \"No messages were stored. Please ensure that the session ID and sender are properly set.\"\n raise ValueError(msg)\n\n stored_message = stored_messages[0]\n self.status = stored_message\n return stored_message\n\n async def retrieve_messages(self) -> Data:\n sender_type = self.sender_type\n sender_name = self.sender_name\n session_id = self.session_id\n n_messages = self.n_messages\n order = \"DESC\" if self.order == \"Descending\" else \"ASC\"\n\n if sender_type == \"Machine and User\":\n sender_type = None\n\n if self.memory and not hasattr(self.memory, \"aget_messages\"):\n memory_name = type(self.memory).__name__\n err_msg = f\"External Memory object ({memory_name}) must have 'aget_messages' method.\"\n raise AttributeError(err_msg)\n # Check if n_messages is None or 0\n if n_messages == 0:\n stored = []\n elif self.memory:\n # override session_id\n self.memory.session_id = session_id\n\n stored = await self.memory.aget_messages()\n # langchain memories are supposed to return messages in ascending order\n\n if n_messages:\n stored = stored[-n_messages:] # Get last N messages first\n\n if order == \"DESC\":\n stored = stored[::-1] # Then reverse if needed\n\n stored = [Message.from_lc_message(m) for m in stored]\n if sender_type:\n expected_type = MESSAGE_SENDER_AI if sender_type == MESSAGE_SENDER_AI else MESSAGE_SENDER_USER\n stored = [m for m in stored if m.type == expected_type]\n else:\n # For internal memory, we always fetch the last N messages by ordering by DESC\n stored = await aget_messages(\n sender=sender_type,\n sender_name=sender_name,\n session_id=session_id,\n limit=10000,\n order=order,\n )\n if n_messages:\n stored = stored[-n_messages:] # Get last N messages\n\n # self.status = stored\n return cast(\"Data\", stored)\n\n async def retrieve_messages_as_text(self) -> Message:\n stored_text = data_to_text(self.template, await self.retrieve_messages())\n # self.status = stored_text\n return Message(text=stored_text)\n\n async def retrieve_messages_dataframe(self) -> DataFrame:\n \"\"\"Convert the retrieved messages into a DataFrame.\n\n Returns:\n DataFrame: A DataFrame containing the message data.\n \"\"\"\n messages = await self.retrieve_messages()\n return DataFrame(messages)\n\n def update_build_config(\n self,\n build_config: dotdict,\n field_value: Any, # noqa: ARG002\n field_name: str | None = None, # noqa: ARG002\n ) -> dotdict:\n return set_current_fields(\n build_config=build_config,\n action_fields=self.mode_config,\n selected_action=build_config[\"mode\"][\"value\"],\n default_fields=self.default_keys,\n func=set_field_display,\n )\n" }, "memory": { "_input_type": "HandleInput", diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Meeting Summary.json b/src/backend/base/langflow/initial_setup/starter_projects/Meeting Summary.json index 02b3f9df93b3..8ede37ca887c 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Meeting Summary.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Meeting Summary.json @@ -1505,7 +1505,7 @@ "show": true, "title_case": false, "type": "code", - "value": "from typing import Any, cast\n\nfrom langflow.custom.custom_component.component import Component\nfrom langflow.helpers.data import data_to_text\nfrom langflow.inputs.inputs import DropdownInput, HandleInput, IntInput, MessageTextInput, MultilineInput, TabInput\nfrom langflow.memory import aget_messages, astore_message\nfrom langflow.schema.data import Data\nfrom langflow.schema.dataframe import DataFrame\nfrom langflow.schema.dotdict import dotdict\nfrom langflow.schema.message import Message\nfrom langflow.template.field.base import Output\nfrom langflow.utils.component_utils import set_current_fields, set_field_display\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass MemoryComponent(Component):\n display_name = \"Message History\"\n description = \"Stores or retrieves stored chat messages from Langflow tables or an external memory.\"\n documentation: str = \"https://docs.langflow.org/components-helpers#message-history\"\n icon = \"message-square-more\"\n name = \"Memory\"\n default_keys = [\"mode\", \"memory\"]\n mode_config = {\n \"Store\": [\"message\", \"memory\", \"sender\", \"sender_name\", \"session_id\"],\n \"Retrieve\": [\"n_messages\", \"order\", \"template\", \"memory\"],\n }\n\n inputs = [\n TabInput(\n name=\"mode\",\n display_name=\"Mode\",\n options=[\"Retrieve\", \"Store\"],\n value=\"Retrieve\",\n info=\"Operation mode: Store messages or Retrieve messages.\",\n real_time_refresh=True,\n ),\n MessageTextInput(\n name=\"message\",\n display_name=\"Message\",\n info=\"The chat message to be stored.\",\n tool_mode=True,\n dynamic=True,\n show=False,\n ),\n HandleInput(\n name=\"memory\",\n display_name=\"External Memory\",\n input_types=[\"Memory\"],\n info=\"Retrieve messages from an external memory. If empty, it will use the Langflow tables.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"sender_type\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER, \"Machine and User\"],\n value=\"Machine and User\",\n info=\"Filter by sender type.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender\",\n display_name=\"Sender\",\n info=\"The sender of the message. Might be Machine or User. \"\n \"If empty, the current sender parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Filter by sender name.\",\n advanced=True,\n show=False,\n ),\n IntInput(\n name=\"n_messages\",\n display_name=\"Number of Messages\",\n value=100,\n info=\"Number of messages to retrieve.\",\n advanced=True,\n show=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n value=\"\",\n advanced=True,\n ),\n DropdownInput(\n name=\"order\",\n display_name=\"Order\",\n options=[\"Ascending\", \"Descending\"],\n value=\"Ascending\",\n info=\"Order of the messages.\",\n advanced=True,\n tool_mode=True,\n required=True,\n ),\n MultilineInput(\n name=\"template\",\n display_name=\"Template\",\n info=\"The template to use for formatting the data. \"\n \"It can contain the keys {text}, {sender} or any other key in the message data.\",\n value=\"{sender_name}: {text}\",\n advanced=True,\n show=False,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Message\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True),\n Output(display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True),\n ]\n\n def update_outputs(self, frontend_node: dict, field_name: str, field_value: Any) -> dict:\n \"\"\"Dynamically show only the relevant output based on the selected output type.\"\"\"\n if field_name == \"mode\":\n # Start with empty outputs\n frontend_node[\"outputs\"] = []\n if field_value == \"Store\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Stored Messages\",\n name=\"stored_messages\",\n method=\"store_message\",\n hidden=True,\n dynamic=True,\n )\n ]\n if field_value == \"Retrieve\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Messages\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True\n ),\n Output(\n display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True\n ),\n ]\n return frontend_node\n\n async def store_message(self) -> Message:\n message = Message(text=self.message) if isinstance(self.message, str) else self.message\n\n message.session_id = self.session_id or message.session_id\n message.sender = self.sender or message.sender or MESSAGE_SENDER_AI\n message.sender_name = self.sender_name or message.sender_name or MESSAGE_SENDER_NAME_AI\n\n stored_messages: list[Message] = []\n\n if self.memory:\n self.memory.session_id = message.session_id\n lc_message = message.to_lc_message()\n await self.memory.aadd_messages([lc_message])\n\n stored_messages = await self.memory.aget_messages() or []\n\n stored_messages = [Message.from_lc_message(m) for m in stored_messages] if stored_messages else []\n\n if message.sender:\n stored_messages = [m for m in stored_messages if m.sender == message.sender]\n else:\n await astore_message(message, flow_id=self.graph.flow_id)\n stored_messages = (\n await aget_messages(\n session_id=message.session_id, sender_name=message.sender_name, sender=message.sender\n )\n or []\n )\n\n if not stored_messages:\n msg = \"No messages were stored. Please ensure that the session ID and sender are properly set.\"\n raise ValueError(msg)\n\n stored_message = stored_messages[0]\n self.status = stored_message\n return stored_message\n\n async def retrieve_messages(self) -> Data:\n sender_type = self.sender_type\n sender_name = self.sender_name\n session_id = self.session_id\n n_messages = self.n_messages\n order = \"DESC\" if self.order == \"Descending\" else \"ASC\"\n\n if sender_type == \"Machine and User\":\n sender_type = None\n\n if self.memory and not hasattr(self.memory, \"aget_messages\"):\n memory_name = type(self.memory).__name__\n err_msg = f\"External Memory object ({memory_name}) must have 'aget_messages' method.\"\n raise AttributeError(err_msg)\n # Check if n_messages is None or 0\n if n_messages == 0:\n stored = []\n elif self.memory:\n # override session_id\n self.memory.session_id = session_id\n\n stored = await self.memory.aget_messages()\n # langchain memories are supposed to return messages in ascending order\n\n if n_messages:\n stored = stored[-n_messages:] # Get last N messages first\n\n if order == \"DESC\":\n stored = stored[::-1] # Then reverse if needed\n\n stored = [Message.from_lc_message(m) for m in stored]\n if sender_type:\n expected_type = MESSAGE_SENDER_AI if sender_type == MESSAGE_SENDER_AI else MESSAGE_SENDER_USER\n stored = [m for m in stored if m.type == expected_type]\n else:\n # For internal memory, we always fetch the last N messages by ordering by DESC\n stored = await aget_messages(\n sender=sender_type,\n sender_name=sender_name,\n session_id=session_id,\n limit=10000,\n order=order,\n )\n if n_messages:\n stored = stored[-n_messages:] # Get last N messages\n\n # self.status = stored\n return cast(\"Data\", stored)\n\n async def retrieve_messages_as_text(self) -> Message:\n stored_text = data_to_text(self.template, await self.retrieve_messages())\n # self.status = stored_text\n return Message(text=stored_text)\n\n async def retrieve_messages_dataframe(self) -> DataFrame:\n \"\"\"Convert the retrieved messages into a DataFrame.\n\n Returns:\n DataFrame: A DataFrame containing the message data.\n \"\"\"\n messages = await self.retrieve_messages()\n return DataFrame(messages)\n\n def update_build_config(\n self,\n build_config: dotdict,\n field_value: Any, # noqa: ARG002\n field_name: str | None = None, # noqa: ARG002\n ) -> dotdict:\n return set_current_fields(\n build_config=build_config,\n action_fields=self.mode_config,\n selected_action=build_config[\"mode\"][\"value\"],\n default_fields=self.default_keys,\n func=set_field_display,\n )\n" + "value": "from typing import Any, cast\n\nfrom langflow.custom.custom_component.component import Component\nfrom langflow.helpers.data import data_to_text\nfrom langflow.inputs.inputs import DropdownInput, HandleInput, IntInput, MessageTextInput, MultilineInput, TabInput\nfrom langflow.memory import aget_messages, astore_message\nfrom langflow.schema.data import Data\nfrom langflow.schema.dataframe import DataFrame\nfrom langflow.schema.dotdict import dotdict\nfrom langflow.schema.message import Message\nfrom langflow.template.field.base import Output\nfrom langflow.utils.component_utils import set_current_fields, set_field_display\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass MemoryComponent(Component):\n display_name = \"Message History\"\n description = \"Stores or retrieves stored chat messages from Langflow tables or an external memory.\"\n documentation: str = \"https://docs.langflow.org/components-helpers#message-history\"\n icon = \"message-square-more\"\n name = \"Memory\"\n default_keys = [\"mode\", \"memory\", \"session_id\"]\n mode_config = {\n \"Store\": [\"message\", \"memory\", \"sender\", \"sender_name\", \"session_id\"],\n \"Retrieve\": [\"n_messages\", \"order\", \"template\", \"memory\", \"session_id\"],\n }\n\n inputs = [\n TabInput(\n name=\"mode\",\n display_name=\"Mode\",\n options=[\"Retrieve\", \"Store\"],\n value=\"Retrieve\",\n info=\"Operation mode: Store messages or Retrieve messages.\",\n real_time_refresh=True,\n ),\n MessageTextInput(\n name=\"message\",\n display_name=\"Message\",\n info=\"The chat message to be stored.\",\n tool_mode=True,\n dynamic=True,\n show=False,\n ),\n HandleInput(\n name=\"memory\",\n display_name=\"External Memory\",\n input_types=[\"Memory\"],\n info=\"Retrieve messages from an external memory. If empty, it will use the Langflow tables.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"sender_type\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER, \"Machine and User\"],\n value=\"Machine and User\",\n info=\"Filter by sender type.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender\",\n display_name=\"Sender\",\n info=\"The sender of the message. Might be Machine or User. \"\n \"If empty, the current sender parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Filter by sender name.\",\n advanced=True,\n show=False,\n ),\n IntInput(\n name=\"n_messages\",\n display_name=\"Number of Messages\",\n value=100,\n info=\"Number of messages to retrieve.\",\n advanced=True,\n show=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n value=\"\",\n advanced=True,\n ),\n DropdownInput(\n name=\"order\",\n display_name=\"Order\",\n options=[\"Ascending\", \"Descending\"],\n value=\"Ascending\",\n info=\"Order of the messages.\",\n advanced=True,\n tool_mode=True,\n required=True,\n ),\n MultilineInput(\n name=\"template\",\n display_name=\"Template\",\n info=\"The template to use for formatting the data. \"\n \"It can contain the keys {text}, {sender} or any other key in the message data.\",\n value=\"{sender_name}: {text}\",\n advanced=True,\n show=False,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Message\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True),\n Output(display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True),\n ]\n\n def update_outputs(self, frontend_node: dict, field_name: str, field_value: Any) -> dict:\n \"\"\"Dynamically show only the relevant output based on the selected output type.\"\"\"\n if field_name == \"mode\":\n # Start with empty outputs\n frontend_node[\"outputs\"] = []\n if field_value == \"Store\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Stored Messages\",\n name=\"stored_messages\",\n method=\"store_message\",\n hidden=True,\n dynamic=True,\n )\n ]\n if field_value == \"Retrieve\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Messages\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True\n ),\n Output(\n display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True\n ),\n ]\n return frontend_node\n\n async def store_message(self) -> Message:\n message = Message(text=self.message) if isinstance(self.message, str) else self.message\n\n message.session_id = self.session_id or message.session_id\n message.sender = self.sender or message.sender or MESSAGE_SENDER_AI\n message.sender_name = self.sender_name or message.sender_name or MESSAGE_SENDER_NAME_AI\n\n stored_messages: list[Message] = []\n\n if self.memory:\n self.memory.session_id = message.session_id\n lc_message = message.to_lc_message()\n await self.memory.aadd_messages([lc_message])\n\n stored_messages = await self.memory.aget_messages() or []\n\n stored_messages = [Message.from_lc_message(m) for m in stored_messages] if stored_messages else []\n\n if message.sender:\n stored_messages = [m for m in stored_messages if m.sender == message.sender]\n else:\n await astore_message(message, flow_id=self.graph.flow_id)\n stored_messages = (\n await aget_messages(\n session_id=message.session_id, sender_name=message.sender_name, sender=message.sender\n )\n or []\n )\n\n if not stored_messages:\n msg = \"No messages were stored. Please ensure that the session ID and sender are properly set.\"\n raise ValueError(msg)\n\n stored_message = stored_messages[0]\n self.status = stored_message\n return stored_message\n\n async def retrieve_messages(self) -> Data:\n sender_type = self.sender_type\n sender_name = self.sender_name\n session_id = self.session_id\n n_messages = self.n_messages\n order = \"DESC\" if self.order == \"Descending\" else \"ASC\"\n\n if sender_type == \"Machine and User\":\n sender_type = None\n\n if self.memory and not hasattr(self.memory, \"aget_messages\"):\n memory_name = type(self.memory).__name__\n err_msg = f\"External Memory object ({memory_name}) must have 'aget_messages' method.\"\n raise AttributeError(err_msg)\n # Check if n_messages is None or 0\n if n_messages == 0:\n stored = []\n elif self.memory:\n # override session_id\n self.memory.session_id = session_id\n\n stored = await self.memory.aget_messages()\n # langchain memories are supposed to return messages in ascending order\n\n if n_messages:\n stored = stored[-n_messages:] # Get last N messages first\n\n if order == \"DESC\":\n stored = stored[::-1] # Then reverse if needed\n\n stored = [Message.from_lc_message(m) for m in stored]\n if sender_type:\n expected_type = MESSAGE_SENDER_AI if sender_type == MESSAGE_SENDER_AI else MESSAGE_SENDER_USER\n stored = [m for m in stored if m.type == expected_type]\n else:\n # For internal memory, we always fetch the last N messages by ordering by DESC\n stored = await aget_messages(\n sender=sender_type,\n sender_name=sender_name,\n session_id=session_id,\n limit=10000,\n order=order,\n )\n if n_messages:\n stored = stored[-n_messages:] # Get last N messages\n\n # self.status = stored\n return cast(\"Data\", stored)\n\n async def retrieve_messages_as_text(self) -> Message:\n stored_text = data_to_text(self.template, await self.retrieve_messages())\n # self.status = stored_text\n return Message(text=stored_text)\n\n async def retrieve_messages_dataframe(self) -> DataFrame:\n \"\"\"Convert the retrieved messages into a DataFrame.\n\n Returns:\n DataFrame: A DataFrame containing the message data.\n \"\"\"\n messages = await self.retrieve_messages()\n return DataFrame(messages)\n\n def update_build_config(\n self,\n build_config: dotdict,\n field_value: Any, # noqa: ARG002\n field_name: str | None = None, # noqa: ARG002\n ) -> dotdict:\n return set_current_fields(\n build_config=build_config,\n action_fields=self.mode_config,\n selected_action=build_config[\"mode\"][\"value\"],\n default_fields=self.default_keys,\n func=set_field_display,\n )\n" }, "memory": { "_input_type": "HandleInput", diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Memory Chatbot.json b/src/backend/base/langflow/initial_setup/starter_projects/Memory Chatbot.json index cc89c0b789f7..c73fe6aed352 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Memory Chatbot.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Memory Chatbot.json @@ -855,7 +855,7 @@ "show": true, "title_case": false, "type": "code", - "value": "from typing import Any, cast\n\nfrom langflow.custom.custom_component.component import Component\nfrom langflow.helpers.data import data_to_text\nfrom langflow.inputs.inputs import DropdownInput, HandleInput, IntInput, MessageTextInput, MultilineInput, TabInput\nfrom langflow.memory import aget_messages, astore_message\nfrom langflow.schema.data import Data\nfrom langflow.schema.dataframe import DataFrame\nfrom langflow.schema.dotdict import dotdict\nfrom langflow.schema.message import Message\nfrom langflow.template.field.base import Output\nfrom langflow.utils.component_utils import set_current_fields, set_field_display\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass MemoryComponent(Component):\n display_name = \"Message History\"\n description = \"Stores or retrieves stored chat messages from Langflow tables or an external memory.\"\n documentation: str = \"https://docs.langflow.org/components-helpers#message-history\"\n icon = \"message-square-more\"\n name = \"Memory\"\n default_keys = [\"mode\", \"memory\"]\n mode_config = {\n \"Store\": [\"message\", \"memory\", \"sender\", \"sender_name\", \"session_id\"],\n \"Retrieve\": [\"n_messages\", \"order\", \"template\", \"memory\"],\n }\n\n inputs = [\n TabInput(\n name=\"mode\",\n display_name=\"Mode\",\n options=[\"Retrieve\", \"Store\"],\n value=\"Retrieve\",\n info=\"Operation mode: Store messages or Retrieve messages.\",\n real_time_refresh=True,\n ),\n MessageTextInput(\n name=\"message\",\n display_name=\"Message\",\n info=\"The chat message to be stored.\",\n tool_mode=True,\n dynamic=True,\n show=False,\n ),\n HandleInput(\n name=\"memory\",\n display_name=\"External Memory\",\n input_types=[\"Memory\"],\n info=\"Retrieve messages from an external memory. If empty, it will use the Langflow tables.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"sender_type\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER, \"Machine and User\"],\n value=\"Machine and User\",\n info=\"Filter by sender type.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender\",\n display_name=\"Sender\",\n info=\"The sender of the message. Might be Machine or User. \"\n \"If empty, the current sender parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Filter by sender name.\",\n advanced=True,\n show=False,\n ),\n IntInput(\n name=\"n_messages\",\n display_name=\"Number of Messages\",\n value=100,\n info=\"Number of messages to retrieve.\",\n advanced=True,\n show=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n value=\"\",\n advanced=True,\n ),\n DropdownInput(\n name=\"order\",\n display_name=\"Order\",\n options=[\"Ascending\", \"Descending\"],\n value=\"Ascending\",\n info=\"Order of the messages.\",\n advanced=True,\n tool_mode=True,\n required=True,\n ),\n MultilineInput(\n name=\"template\",\n display_name=\"Template\",\n info=\"The template to use for formatting the data. \"\n \"It can contain the keys {text}, {sender} or any other key in the message data.\",\n value=\"{sender_name}: {text}\",\n advanced=True,\n show=False,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Message\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True),\n Output(display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True),\n ]\n\n def update_outputs(self, frontend_node: dict, field_name: str, field_value: Any) -> dict:\n \"\"\"Dynamically show only the relevant output based on the selected output type.\"\"\"\n if field_name == \"mode\":\n # Start with empty outputs\n frontend_node[\"outputs\"] = []\n if field_value == \"Store\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Stored Messages\",\n name=\"stored_messages\",\n method=\"store_message\",\n hidden=True,\n dynamic=True,\n )\n ]\n if field_value == \"Retrieve\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Messages\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True\n ),\n Output(\n display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True\n ),\n ]\n return frontend_node\n\n async def store_message(self) -> Message:\n message = Message(text=self.message) if isinstance(self.message, str) else self.message\n\n message.session_id = self.session_id or message.session_id\n message.sender = self.sender or message.sender or MESSAGE_SENDER_AI\n message.sender_name = self.sender_name or message.sender_name or MESSAGE_SENDER_NAME_AI\n\n stored_messages: list[Message] = []\n\n if self.memory:\n self.memory.session_id = message.session_id\n lc_message = message.to_lc_message()\n await self.memory.aadd_messages([lc_message])\n\n stored_messages = await self.memory.aget_messages() or []\n\n stored_messages = [Message.from_lc_message(m) for m in stored_messages] if stored_messages else []\n\n if message.sender:\n stored_messages = [m for m in stored_messages if m.sender == message.sender]\n else:\n await astore_message(message, flow_id=self.graph.flow_id)\n stored_messages = (\n await aget_messages(\n session_id=message.session_id, sender_name=message.sender_name, sender=message.sender\n )\n or []\n )\n\n if not stored_messages:\n msg = \"No messages were stored. Please ensure that the session ID and sender are properly set.\"\n raise ValueError(msg)\n\n stored_message = stored_messages[0]\n self.status = stored_message\n return stored_message\n\n async def retrieve_messages(self) -> Data:\n sender_type = self.sender_type\n sender_name = self.sender_name\n session_id = self.session_id\n n_messages = self.n_messages\n order = \"DESC\" if self.order == \"Descending\" else \"ASC\"\n\n if sender_type == \"Machine and User\":\n sender_type = None\n\n if self.memory and not hasattr(self.memory, \"aget_messages\"):\n memory_name = type(self.memory).__name__\n err_msg = f\"External Memory object ({memory_name}) must have 'aget_messages' method.\"\n raise AttributeError(err_msg)\n # Check if n_messages is None or 0\n if n_messages == 0:\n stored = []\n elif self.memory:\n # override session_id\n self.memory.session_id = session_id\n\n stored = await self.memory.aget_messages()\n # langchain memories are supposed to return messages in ascending order\n\n if n_messages:\n stored = stored[-n_messages:] # Get last N messages first\n\n if order == \"DESC\":\n stored = stored[::-1] # Then reverse if needed\n\n stored = [Message.from_lc_message(m) for m in stored]\n if sender_type:\n expected_type = MESSAGE_SENDER_AI if sender_type == MESSAGE_SENDER_AI else MESSAGE_SENDER_USER\n stored = [m for m in stored if m.type == expected_type]\n else:\n # For internal memory, we always fetch the last N messages by ordering by DESC\n stored = await aget_messages(\n sender=sender_type,\n sender_name=sender_name,\n session_id=session_id,\n limit=10000,\n order=order,\n )\n if n_messages:\n stored = stored[-n_messages:] # Get last N messages\n\n # self.status = stored\n return cast(\"Data\", stored)\n\n async def retrieve_messages_as_text(self) -> Message:\n stored_text = data_to_text(self.template, await self.retrieve_messages())\n # self.status = stored_text\n return Message(text=stored_text)\n\n async def retrieve_messages_dataframe(self) -> DataFrame:\n \"\"\"Convert the retrieved messages into a DataFrame.\n\n Returns:\n DataFrame: A DataFrame containing the message data.\n \"\"\"\n messages = await self.retrieve_messages()\n return DataFrame(messages)\n\n def update_build_config(\n self,\n build_config: dotdict,\n field_value: Any, # noqa: ARG002\n field_name: str | None = None, # noqa: ARG002\n ) -> dotdict:\n return set_current_fields(\n build_config=build_config,\n action_fields=self.mode_config,\n selected_action=build_config[\"mode\"][\"value\"],\n default_fields=self.default_keys,\n func=set_field_display,\n )\n" + "value": "from typing import Any, cast\n\nfrom langflow.custom.custom_component.component import Component\nfrom langflow.helpers.data import data_to_text\nfrom langflow.inputs.inputs import DropdownInput, HandleInput, IntInput, MessageTextInput, MultilineInput, TabInput\nfrom langflow.memory import aget_messages, astore_message\nfrom langflow.schema.data import Data\nfrom langflow.schema.dataframe import DataFrame\nfrom langflow.schema.dotdict import dotdict\nfrom langflow.schema.message import Message\nfrom langflow.template.field.base import Output\nfrom langflow.utils.component_utils import set_current_fields, set_field_display\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass MemoryComponent(Component):\n display_name = \"Message History\"\n description = \"Stores or retrieves stored chat messages from Langflow tables or an external memory.\"\n documentation: str = \"https://docs.langflow.org/components-helpers#message-history\"\n icon = \"message-square-more\"\n name = \"Memory\"\n default_keys = [\"mode\", \"memory\", \"session_id\"]\n mode_config = {\n \"Store\": [\"message\", \"memory\", \"sender\", \"sender_name\", \"session_id\"],\n \"Retrieve\": [\"n_messages\", \"order\", \"template\", \"memory\", \"session_id\"],\n }\n\n inputs = [\n TabInput(\n name=\"mode\",\n display_name=\"Mode\",\n options=[\"Retrieve\", \"Store\"],\n value=\"Retrieve\",\n info=\"Operation mode: Store messages or Retrieve messages.\",\n real_time_refresh=True,\n ),\n MessageTextInput(\n name=\"message\",\n display_name=\"Message\",\n info=\"The chat message to be stored.\",\n tool_mode=True,\n dynamic=True,\n show=False,\n ),\n HandleInput(\n name=\"memory\",\n display_name=\"External Memory\",\n input_types=[\"Memory\"],\n info=\"Retrieve messages from an external memory. If empty, it will use the Langflow tables.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"sender_type\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER, \"Machine and User\"],\n value=\"Machine and User\",\n info=\"Filter by sender type.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender\",\n display_name=\"Sender\",\n info=\"The sender of the message. Might be Machine or User. \"\n \"If empty, the current sender parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Filter by sender name.\",\n advanced=True,\n show=False,\n ),\n IntInput(\n name=\"n_messages\",\n display_name=\"Number of Messages\",\n value=100,\n info=\"Number of messages to retrieve.\",\n advanced=True,\n show=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n value=\"\",\n advanced=True,\n ),\n DropdownInput(\n name=\"order\",\n display_name=\"Order\",\n options=[\"Ascending\", \"Descending\"],\n value=\"Ascending\",\n info=\"Order of the messages.\",\n advanced=True,\n tool_mode=True,\n required=True,\n ),\n MultilineInput(\n name=\"template\",\n display_name=\"Template\",\n info=\"The template to use for formatting the data. \"\n \"It can contain the keys {text}, {sender} or any other key in the message data.\",\n value=\"{sender_name}: {text}\",\n advanced=True,\n show=False,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Message\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True),\n Output(display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True),\n ]\n\n def update_outputs(self, frontend_node: dict, field_name: str, field_value: Any) -> dict:\n \"\"\"Dynamically show only the relevant output based on the selected output type.\"\"\"\n if field_name == \"mode\":\n # Start with empty outputs\n frontend_node[\"outputs\"] = []\n if field_value == \"Store\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Stored Messages\",\n name=\"stored_messages\",\n method=\"store_message\",\n hidden=True,\n dynamic=True,\n )\n ]\n if field_value == \"Retrieve\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Messages\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True\n ),\n Output(\n display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True\n ),\n ]\n return frontend_node\n\n async def store_message(self) -> Message:\n message = Message(text=self.message) if isinstance(self.message, str) else self.message\n\n message.session_id = self.session_id or message.session_id\n message.sender = self.sender or message.sender or MESSAGE_SENDER_AI\n message.sender_name = self.sender_name or message.sender_name or MESSAGE_SENDER_NAME_AI\n\n stored_messages: list[Message] = []\n\n if self.memory:\n self.memory.session_id = message.session_id\n lc_message = message.to_lc_message()\n await self.memory.aadd_messages([lc_message])\n\n stored_messages = await self.memory.aget_messages() or []\n\n stored_messages = [Message.from_lc_message(m) for m in stored_messages] if stored_messages else []\n\n if message.sender:\n stored_messages = [m for m in stored_messages if m.sender == message.sender]\n else:\n await astore_message(message, flow_id=self.graph.flow_id)\n stored_messages = (\n await aget_messages(\n session_id=message.session_id, sender_name=message.sender_name, sender=message.sender\n )\n or []\n )\n\n if not stored_messages:\n msg = \"No messages were stored. Please ensure that the session ID and sender are properly set.\"\n raise ValueError(msg)\n\n stored_message = stored_messages[0]\n self.status = stored_message\n return stored_message\n\n async def retrieve_messages(self) -> Data:\n sender_type = self.sender_type\n sender_name = self.sender_name\n session_id = self.session_id\n n_messages = self.n_messages\n order = \"DESC\" if self.order == \"Descending\" else \"ASC\"\n\n if sender_type == \"Machine and User\":\n sender_type = None\n\n if self.memory and not hasattr(self.memory, \"aget_messages\"):\n memory_name = type(self.memory).__name__\n err_msg = f\"External Memory object ({memory_name}) must have 'aget_messages' method.\"\n raise AttributeError(err_msg)\n # Check if n_messages is None or 0\n if n_messages == 0:\n stored = []\n elif self.memory:\n # override session_id\n self.memory.session_id = session_id\n\n stored = await self.memory.aget_messages()\n # langchain memories are supposed to return messages in ascending order\n\n if n_messages:\n stored = stored[-n_messages:] # Get last N messages first\n\n if order == \"DESC\":\n stored = stored[::-1] # Then reverse if needed\n\n stored = [Message.from_lc_message(m) for m in stored]\n if sender_type:\n expected_type = MESSAGE_SENDER_AI if sender_type == MESSAGE_SENDER_AI else MESSAGE_SENDER_USER\n stored = [m for m in stored if m.type == expected_type]\n else:\n # For internal memory, we always fetch the last N messages by ordering by DESC\n stored = await aget_messages(\n sender=sender_type,\n sender_name=sender_name,\n session_id=session_id,\n limit=10000,\n order=order,\n )\n if n_messages:\n stored = stored[-n_messages:] # Get last N messages\n\n # self.status = stored\n return cast(\"Data\", stored)\n\n async def retrieve_messages_as_text(self) -> Message:\n stored_text = data_to_text(self.template, await self.retrieve_messages())\n # self.status = stored_text\n return Message(text=stored_text)\n\n async def retrieve_messages_dataframe(self) -> DataFrame:\n \"\"\"Convert the retrieved messages into a DataFrame.\n\n Returns:\n DataFrame: A DataFrame containing the message data.\n \"\"\"\n messages = await self.retrieve_messages()\n return DataFrame(messages)\n\n def update_build_config(\n self,\n build_config: dotdict,\n field_value: Any, # noqa: ARG002\n field_name: str | None = None, # noqa: ARG002\n ) -> dotdict:\n return set_current_fields(\n build_config=build_config,\n action_fields=self.mode_config,\n selected_action=build_config[\"mode\"][\"value\"],\n default_fields=self.default_keys,\n func=set_field_display,\n )\n" }, "memory": { "_input_type": "HandleInput", diff --git a/src/backend/tests/unit/components/processing/test_structured_output_component.py b/src/backend/tests/unit/components/processing/test_structured_output_component.py index 33ab6a3407d2..055e485512ad 100644 --- a/src/backend/tests/unit/components/processing/test_structured_output_component.py +++ b/src/backend/tests/unit/components/processing/test_structured_output_component.py @@ -252,8 +252,8 @@ def model_dump(self, **__): mock_get_chat_result.assert_called_once() @pytest.mark.skipif( - "OPENAI_API_KEY" not in os.environ, - reason="OPENAI_API_KEY environment variable not set", + not (os.getenv("OPENAI_API_KEY") or "").strip(), + reason="OPENAI_API_KEY is not set or is empty", ) def test_with_real_openai_model_simple_schema(self): # Create a real OpenAI model @@ -284,7 +284,7 @@ def test_with_real_openai_model_simple_schema(self): assert result[0]["age"] == 30 @pytest.mark.skipif( - "OPENAI_API_KEY" not in os.environ, + not (os.getenv("OPENAI_API_KEY") or "").strip(), reason="OPENAI_API_KEY environment variable not set", ) def test_with_real_openai_model_multiple_patterns(self): @@ -397,7 +397,7 @@ def model_dump(self, **__): assert 899.99 in prices @pytest.mark.skipif( - "OPENAI_API_KEY" not in os.environ, + not (os.getenv("OPENAI_API_KEY") or "").strip(), reason="OPENAI_API_KEY environment variable not set", ) def test_with_real_openai_model_simple_schema_fail(self): @@ -433,7 +433,7 @@ def test_with_real_openai_model_simple_schema_fail(self): ), f"Expected max_tokens error but got: {error_message}" @pytest.mark.skipif( - "OPENAI_API_KEY" not in os.environ, + not (os.getenv("OPENAI_API_KEY") or "").strip(), reason="OPENAI_API_KEY environment variable not set", ) def test_with_real_openai_model_complex_schema(self): @@ -482,7 +482,7 @@ def test_with_real_openai_model_complex_schema(self): assert result[0]["rating"] == 4.0 @pytest.mark.skipif( - "OPENAI_API_KEY" not in os.environ, + not (os.getenv("OPENAI_API_KEY") or "").strip(), reason="OPENAI_API_KEY environment variable not set", ) def test_with_real_openai_model_nested_schema(self): @@ -547,7 +547,7 @@ def test_with_real_openai_model_nested_schema(self): assert result[0]["would_return"] is True @pytest.mark.skipif( - "NVIDIA_API_KEY" not in os.environ, + not (os.getenv("NVIDIA_API_KEY") or "").strip(), reason="NVIDIA_API_KEY environment variable not set", ) def test_with_real_nvidia_model_simple_schema(self):