Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions src/backend/base/langflow/base/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from langflow.base.agents.callback import AgentAsyncHandler
from langflow.base.agents.events import ExceptionWithMessageError, process_agent_events
from langflow.base.agents.utils import data_to_messages
from langflow.custom.custom_component.component import Component, _get_component_toolkit
from langflow.field_typing import Tool
from langflow.inputs.inputs import InputTypes, MultilineInput
Expand Down Expand Up @@ -118,6 +117,21 @@ def get_chat_history_data(self) -> list[Data] | None:
# might be overridden in subclasses
return None

def _data_to_messages_skip_empty(self, data: list[Data]):
"""Convert data to messages, filtering only empty text while preserving non-text content."""
messages = []
for value in data:
# Only skip if the message has a text attribute that is empty/whitespace
text = getattr(value, "text", None)
if isinstance(text, str) and not text.strip():
# Skip only messages with empty/whitespace-only text strings
continue

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I don't do this, I get errors like:

 Details: An error occurred (ValidationException) when calling the ConverseStream operation: The content field in the Message object at messages.6 is empty. Add a ContentBlock object to the content 
  field and try again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very weird. Maybe there's a langchain version problem causing this.

lc_message = value.to_lc_message()
messages.append(lc_message)

return messages

async def run_agent(
self,
agent: Runnable | BaseSingleActionAgent | BaseMultiActionAgent | AgentExecutor,
Expand All @@ -143,14 +157,21 @@ async def run_agent(
input_dict["system_prompt"] = self.system_prompt
if hasattr(self, "chat_history") and self.chat_history:
if isinstance(self.chat_history, Data):
input_dict["chat_history"] = data_to_messages(self.chat_history)
input_dict["chat_history"] = self._data_to_messages_skip_empty(self.chat_history)
if all(isinstance(m, Message) for m in self.chat_history):
input_dict["chat_history"] = data_to_messages([m.to_data() for m in self.chat_history])
input_dict["chat_history"] = self._data_to_messages_skip_empty([m.to_data() for m in self.chat_history])
if hasattr(input_dict["input"], "content") and isinstance(input_dict["input"].content, list):
# ! Because the input has to be a string, we must pass the images in the chat_history

image_dicts = [item for item in input_dict["input"].content if item.get("type") == "image"]
input_dict["input"].content = [item for item in input_dict["input"].content if item.get("type") != "image"]
text_content = [item for item in input_dict["input"].content if item.get("type") != "image"]

# Ensure we don't create an empty content list
if text_content:
input_dict["input"].content = text_content
else:
# If no text content, convert to empty string to avoid empty message
input_dict["input"] = ""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this handles some empty message errors I was seeing with bedrock - it raises an error if the content is none


if "chat_history" not in input_dict:
input_dict["chat_history"] = []
Expand Down
65 changes: 46 additions & 19 deletions src/backend/base/langflow/base/agents/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@


async def handle_on_chain_start(
event: dict[str, Any], agent_message: Message, send_message_method: SendMessageFunctionType, start_time: float
event: dict[str, Any],
agent_message: Message,
send_message_method: SendMessageFunctionType,
start_time: float,
had_streaming: bool = False,

Check failure on line 60 in src/backend/base/langflow/base/agents/events.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (ARG001)

src/backend/base/langflow/base/agents/events.py:60:5: ARG001 Unused function argument: `had_streaming`

Check failure on line 60 in src/backend/base/langflow/base/agents/events.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (FBT002)

src/backend/base/langflow/base/agents/events.py:60:5: FBT002 Boolean default positional argument in function definition

Check failure on line 60 in src/backend/base/langflow/base/agents/events.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (FBT001)

src/backend/base/langflow/base/agents/events.py:60:5: FBT001 Boolean-typed positional argument in function definition
) -> tuple[Message, float]:
# Create content blocks if they don't exist
if not agent_message.content_blocks:
Expand Down Expand Up @@ -98,30 +102,35 @@
if isinstance(item, str):
return item
if isinstance(item, dict):
# Check for text content first
if "text" in item:
return item["text"]
# If the item's type is "tool_use", return an empty string.
# This likely indicates that "tool_use" outputs are not meant to be displayed as text.
if item.get("type") == "tool_use":
return ""
if isinstance(item, dict):
if "text" in item:
return item["text"]
# If the item's type is "tool_use", return an empty string.
# This likely indicates that "tool_use" outputs are not meant to be displayed as text.
# Check for alternative text fields
if "content" in item:
return str(item["content"])
if "message" in item:
return str(item["message"])
# Handle special cases that should return empty string
if item.get("type") == "tool_use":
return ""
# This is a workaround to deal with function calling by Anthropic
# since the same data comes in the tool_output we don't need to stream it here
# although it would be nice to
if "partial_json" in item:
return ""
# Handle streaming metadata chunks that only contain index or other non-text fields
if "index" in item and "text" not in item:
return ""
# Handle other metadata-only chunks that don't contain meaningful text
if not any(key in item for key in ["text", "content", "message"]):
return ""
msg = f"Output is not a string or list of dictionaries with 'text' key: {output}"
raise TypeError(msg)


async def handle_on_chain_end(
event: dict[str, Any], agent_message: Message, send_message_method: SendMessageFunctionType, start_time: float
event: dict[str, Any],
agent_message: Message,
send_message_method: SendMessageFunctionType,
start_time: float,
had_streaming: bool = False,

Check failure on line 133 in src/backend/base/langflow/base/agents/events.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (FBT002)

src/backend/base/langflow/base/agents/events.py:133:5: FBT002 Boolean default positional argument in function definition

Check failure on line 133 in src/backend/base/langflow/base/agents/events.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (FBT001)

src/backend/base/langflow/base/agents/events.py:133:5: FBT001 Boolean-typed positional argument in function definition
) -> tuple[Message, float]:
data_output = event["data"].get("output")
if data_output and isinstance(data_output, AgentFinish) and data_output.return_values.get("output"):
Expand All @@ -139,7 +148,11 @@
header={"title": "Output", "icon": "MessageSquare"},
)
agent_message.content_blocks[0].contents.append(text_content)
agent_message = await send_message_method(message=agent_message)

# Only send final message if we didn't have streaming chunks
# If we had streaming, frontend already accumulated the chunks
if not had_streaming:
agent_message = await send_message_method(message=agent_message)
start_time = perf_counter()
return agent_message, start_time

Expand Down Expand Up @@ -256,6 +269,7 @@
agent_message: Message,
send_message_method: SendMessageFunctionType,
start_time: float,
had_streaming: bool = False,

Check failure on line 272 in src/backend/base/langflow/base/agents/events.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (ARG001)

src/backend/base/langflow/base/agents/events.py:272:5: ARG001 Unused function argument: `had_streaming`

Check failure on line 272 in src/backend/base/langflow/base/agents/events.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (FBT002)

src/backend/base/langflow/base/agents/events.py:272:5: FBT002 Boolean default positional argument in function definition

Check failure on line 272 in src/backend/base/langflow/base/agents/events.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (FBT001)

src/backend/base/langflow/base/agents/events.py:272:5: FBT001 Boolean-typed positional argument in function definition
) -> tuple[Message, float]:
data_chunk = event["data"].get("chunk", {})
if isinstance(data_chunk, dict) and data_chunk.get("output"):
Expand All @@ -267,11 +281,11 @@
start_time = perf_counter()
elif isinstance(data_chunk, AIMessageChunk):
output_text = _extract_output_text(data_chunk.content)
if output_text and isinstance(agent_message.text, str):
agent_message.text += output_text
if output_text and output_text.strip():
# For streaming, send only the chunk (not accumulated text)
agent_message.text = output_text
agent_message.properties.state = "partial"
agent_message = await send_message_method(message=agent_message)
if not agent_message.text:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not agent_message.text: was here to only start the time counter if the message just started being generated.

start_time = perf_counter()
return agent_message, start_time

Expand Down Expand Up @@ -330,6 +344,8 @@
try:
# Create a mapping of run_ids to tool contents
tool_blocks_map: dict[str, ToolContent] = {}
# Track if we had streaming events
had_streaming = False
start_time = perf_counter()
async for event in agent_executor:
if event["event"] in TOOL_EVENT_HANDLERS:
Expand All @@ -339,7 +355,18 @@
)
elif event["event"] in CHAIN_EVENT_HANDLERS:
chain_handler = CHAIN_EVENT_HANDLERS[event["event"]]
agent_message, start_time = await chain_handler(event, agent_message, send_message_method, start_time)
# Check if this is a streaming event
if event["event"] in ("on_chain_stream", "on_chat_model_stream"):
had_streaming = True
# Pass had_streaming parameter for chain_end events
if event["event"] == "on_chain_end":
agent_message, start_time = await chain_handler(
event, agent_message, send_message_method, start_time, had_streaming
)
else:
agent_message, start_time = await chain_handler(
event, agent_message, send_message_method, start_time
)
agent_message.properties.state = "complete"
except Exception as e:
raise ExceptionWithMessageError(agent_message, str(e)) from e
Expand Down

Large diffs are not rendered by default.

60 changes: 60 additions & 0 deletions src/backend/tests/unit/components/agents/test_agent_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from unittest.mock import AsyncMock

from langchain_core.agents import AgentFinish
from langchain_core.messages import AIMessageChunk
from langflow.base.agents.agent import process_agent_events
from langflow.base.agents.events import (
handle_on_chain_end,
Expand Down Expand Up @@ -541,3 +542,62 @@ async def test_handle_on_chain_stream_no_output():
assert updated_message.text == ""
assert updated_message.properties.state == "partial"
assert isinstance(start_time, float)


async def test_agent_streaming_no_text_accumulation():
"""Test that agent streaming sends individual chunks without accumulating text."""
sent_messages = []

async def mock_send_message(message):
# Capture each message sent for verification
sent_messages.append(
{"text": message.text, "state": message.properties.state, "id": getattr(message, "id", None)}
)
return message

agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])],
session_id="test_session_id",
)

# Simulate streaming events with individual chunks
events = [
{
"event": "on_chain_stream",
"data": {"chunk": AIMessageChunk(content="Hello")},
},
{
"event": "on_chain_stream",
"data": {"chunk": AIMessageChunk(content=" world")},
},
{
"event": "on_chain_stream",
"data": {"chunk": AIMessageChunk(content="!")},
},
{
"event": "on_chain_end",
"data": {"output": AgentFinish(return_values={"output": "Hello world!"}, log="complete")},
},
]

result = await process_agent_events(create_event_iterator(events), agent_message, mock_send_message)

# Verify individual chunks were sent (not accumulated)
streaming_messages = [msg for msg in sent_messages if msg["state"] == "partial"]
assert len(streaming_messages) == 3, f"Expected 3 streaming messages, got {len(streaming_messages)}"

# Each streaming message should contain only its chunk, not accumulated text
assert streaming_messages[0]["text"] == "Hello"
assert streaming_messages[1]["text"] == " world"
assert streaming_messages[2]["text"] == "!"

# Verify no streaming message contains accumulated text
for msg in streaming_messages:
assert "Hello world!" not in msg["text"], f"Found accumulated text in chunk: {msg['text']}"

# Final result should have complete message
assert result.properties.state == "complete"
assert result.text == "Hello world!"
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,9 @@ export default function ChatView({

const isStreamingUpdate =
!lastMessage.isSend &&
currentMessageContent &&
currentMessageContent !== lastMessageContent &&
currentMessageContent.length > lastMessageContent.length;
currentMessageContent.length > (lastMessageContent?.length || 0);

if (isStreamingUpdate) {
if (!isLlmResponding) {
Expand Down
14 changes: 12 additions & 2 deletions src/frontend/src/stores/messagesStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,17 @@ export const useMessagesStore = create<MessagesStoreType>((set, get) => ({
addMessage: (message) => {
const existingMessage = get().messages.find((msg) => msg.id === message.id);
if (existingMessage) {
get().updateMessagePartial(message);
// Check if this is a streaming partial message (state: "partial")
if (message.properties?.state === "partial") {
// For streaming, accumulate the text content
get().updateMessageText(message.id, message.text || "");
// Update other properties but preserve accumulated text
const { text, ...messageWithoutText } = message;
get().updateMessagePartial(messageWithoutText);
} else {
// For complete messages, replace entirely
get().updateMessagePartial(message);
}
return;
}
if (message.sender === "Machine") {
Expand Down Expand Up @@ -59,7 +69,7 @@ export const useMessagesStore = create<MessagesStoreType>((set, get) => ({
if (state.messages[i].id === id) {
updatedMessages[i] = {
...updatedMessages[i],
text: updatedMessages[i].text + chunk,
text: (updatedMessages[i].text || "") + chunk,
};
break;
}
Expand Down
Loading
Loading