Skip to content
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
a703de2
feat: Introduce telemetry tracking for sensitive field types
ogabrielluiz Oct 6, 2025
369c9ec
feat: Enhance telemetry payloads with additional fields and serializa…
ogabrielluiz Oct 6, 2025
1dd2398
feat: Implement telemetry input tracking and caching
ogabrielluiz Oct 6, 2025
3978a49
feat: Add logging for component input telemetry
ogabrielluiz Oct 6, 2025
e2ba2f7
feat: Enhance telemetry logging for component execution
ogabrielluiz Oct 6, 2025
cf7710e
feat: Extend telemetry payload tests and enhance serialization
ogabrielluiz Oct 6, 2025
fbaedfc
Merge branch 'main' into per-component-telemetry-payload
ogabrielluiz Oct 13, 2025
53e5f73
fix: Update default telemetry tracking behavior in BaseInputMixin
ogabrielluiz Oct 13, 2025
0b599c7
fix: Update telemetry tracking defaults for input types
ogabrielluiz Oct 13, 2025
ebb39fb
feat: add chunk_index and total_chunks fields to ComponentInputsPayload
ogabrielluiz Oct 13, 2025
a77f14e
refactor: update ComponentInputsPayload to support automatic splittin…
ogabrielluiz Oct 13, 2025
35f70c0
refactor: enhance log_package_component_inputs to handle oversized pa…
ogabrielluiz Oct 13, 2025
0823bdf
refactor: centralize maximum telemetry URL size constant
ogabrielluiz Oct 13, 2025
cbe0739
refactor: update ComponentInputsPayload tests to use dictionary inputs
ogabrielluiz Oct 13, 2025
e2b1f60
test: add integration tests for telemetry service payload splitting
ogabrielluiz Oct 13, 2025
b49b328
test: enhance ComponentInputsPayload tests with additional scenarios
ogabrielluiz Oct 13, 2025
2594e23
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 13, 2025
8098fff
optimize query param encoding
ogabrielluiz Oct 13, 2025
da23b6a
refactor: extract telemetry logging logic into a separate function
ogabrielluiz Oct 13, 2025
4835413
refactor: optimize truncation logic in ComponentInputsPayload
ogabrielluiz Oct 13, 2025
1198b88
refactor: update telemetry tracking logic to respect opt-in flag
ogabrielluiz Oct 13, 2025
735e157
refactor: update tests to use dictionary format for component inputs
ogabrielluiz Oct 13, 2025
d4ffdd7
Merge branch 'main' into per-component-telemetry-payload
ogabrielluiz Oct 13, 2025
2f9a66a
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 13, 2025
fe2933a
refactor: specify type for current_chunk_inputs in ComponentInputsPay…
ogabrielluiz Oct 14, 2025
eed11ca
test: add component_id to ComponentPayload tests
ogabrielluiz Oct 14, 2025
762f4b4
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 14, 2025
fe6156e
feat: add component_id to ComponentPayload in build_vertex function
ogabrielluiz Oct 14, 2025
a58d57c
fix: update MAX_TELEMETRY_URL_SIZE to 2048 and adjust related tests
ogabrielluiz Oct 14, 2025
7e0f690
Merge branch 'main' into per-component-telemetry-payload
ogabrielluiz Oct 24, 2025
fab4857
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 24, 2025
65af5a2
merge main
ogabrielluiz Nov 14, 2025
91d9977
feat(telemetry): add track_in_telemetry field to starter project conf…
ogabrielluiz Nov 14, 2025
61b828e
refactor(telemetry): remove unused blank line in test imports
ogabrielluiz Nov 14, 2025
1187286
[autofix.ci] apply automated fixes
autofix-ci[bot] Nov 14, 2025
8014b80
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Nov 14, 2025
8468260
[autofix.ci] apply automated fixes (attempt 3/3)
autofix-ci[bot] Nov 14, 2025
e77495c
merge main
ogabrielluiz Nov 14, 2025
8480e6d
update starter templates
ogabrielluiz Nov 14, 2025
21b924b
[autofix.ci] apply automated fixes
autofix-ci[bot] Nov 14, 2025
d8ce0d4
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Nov 14, 2025
cab1687
[autofix.ci] apply automated fixes (attempt 3/3)
autofix-ci[bot] Nov 14, 2025
db6eb6e
Merge branch 'main' into per-component-telemetry-payload
ogabrielluiz Nov 14, 2025
0604d60
[autofix.ci] apply automated fixes
autofix-ci[bot] Nov 14, 2025
7a5cfd6
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -1400,7 +1400,17 @@
"line_number": 36,
"is_secret": false
}
],
"src/lfx/src/lfx/inputs/input_mixin.py": [
{
"type": "Secret Keyword",
"filename": "src/lfx/src/lfx/inputs/input_mixin.py",
"hashed_secret": "3442496b96dd01591a8cd44b1eec1368ab728aba",
"is_verified": false,
"line_number": 21,
"is_secret": false
}
]
},
"generated_at": "2025-10-07T17:37:54Z"
"generated_at": "2025-10-13T18:52:52Z"
}
48 changes: 47 additions & 1 deletion src/backend/base/langflow/api/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,33 @@
from langflow.services.database.models.flow.model import Flow
from langflow.services.deps import get_chat_service, get_telemetry_service, session_scope
from langflow.services.job_queue.service import JobQueueNotFoundError, JobQueueService
from langflow.services.telemetry.schema import ComponentPayload, PlaygroundPayload
from langflow.services.telemetry.schema import (
ComponentInputsPayload,
ComponentPayload,
PlaygroundPayload,
)


def _log_component_input_telemetry(
vertex,
vertex_id: str,
component_run_id: str,
background_tasks: BackgroundTasks,
telemetry_service,
) -> None:
"""Log component input telemetry if available."""
if hasattr(vertex, "custom_component") and vertex.custom_component:
inputs_dict = vertex.custom_component.get_telemetry_input_values()
if inputs_dict:
background_tasks.add_task(
telemetry_service.log_package_component_inputs,
ComponentInputsPayload(
component_run_id=component_run_id,
component_id=vertex_id,
component_name=vertex_id.split("-")[0],
component_inputs=inputs_dict,
),
)


async def start_flow_build(
Expand Down Expand Up @@ -286,6 +312,7 @@ async def _build_vertex(vertex_id: str, graph: Graph, event_manager: EventManage
top_level_vertices = []
start_time = time.perf_counter()
error_message = None

try:
vertex = graph.get_vertex(vertex_id)
try:
Expand Down Expand Up @@ -372,23 +399,42 @@ async def _build_vertex(vertex_id: str, graph: Graph, event_manager: EventManage
id=vertex.id,
data=result_data_response,
)

# Generate run_id for this component execution
component_run_id = str(uuid.uuid4())

# Extract and send component input telemetry (separate payload)
_log_component_input_telemetry(vertex, vertex_id, component_run_id, background_tasks, telemetry_service)

# Send component execution telemetry
background_tasks.add_task(
telemetry_service.log_package_component,
ComponentPayload(
component_name=vertex_id.split("-")[0],
component_id=vertex_id,
component_seconds=int(time.perf_counter() - start_time),
component_success=valid,
component_error_message=error_message,
component_run_id=component_run_id,
),
)
except Exception as exc:
# Generate run_id for this component execution (error case)
component_run_id = str(uuid.uuid4())

# Extract and send component input telemetry even on error (separate payload)
_log_component_input_telemetry(vertex, vertex_id, component_run_id, background_tasks, telemetry_service)

# Send component execution telemetry (error case)
background_tasks.add_task(
telemetry_service.log_package_component,
ComponentPayload(
component_name=vertex_id.split("-")[0],
component_id=vertex_id,
component_seconds=int(time.perf_counter() - start_time),
component_success=False,
component_error_message=str(exc),
component_run_id=component_run_id,
),
)
await logger.aexception("Error building Component")
Expand Down
2 changes: 2 additions & 0 deletions src/backend/base/langflow/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ async def build_vertex(
telemetry_service.log_package_component,
ComponentPayload(
component_name=vertex_id.split("-")[0],
component_id=vertex_id,
component_seconds=int(time.perf_counter() - start_time),
component_success=valid,
component_error_message=error_message,
Expand All @@ -390,6 +391,7 @@ async def build_vertex(
telemetry_service.log_package_component,
ComponentPayload(
component_name=vertex_id.split("-")[0],
component_id=vertex_id,
component_seconds=int(time.perf_counter() - start_time),
component_success=False,
component_error_message=str(exc),
Expand Down
202 changes: 202 additions & 0 deletions src/backend/base/langflow/services/telemetry/schema.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
from typing import Any

from pydantic import BaseModel, Field

# Maximum URL length for telemetry GET requests (Scarf pixel tracking)
# Scarf supports up to 2KB (2048 bytes) for query parameters
MAX_TELEMETRY_URL_SIZE = 2048


class BasePayload(BaseModel):
client_type: str | None = Field(default=None, serialization_alias="clientType")
Expand Down Expand Up @@ -36,9 +42,205 @@ class PlaygroundPayload(BasePayload):

class ComponentPayload(BasePayload):
component_name: str = Field(serialization_alias="componentName")
component_id: str = Field(serialization_alias="componentId")
component_seconds: int = Field(serialization_alias="componentSeconds")
component_success: bool = Field(serialization_alias="componentSuccess")
component_error_message: str | None = Field(None, serialization_alias="componentErrorMessage")
component_run_id: str | None = Field(None, serialization_alias="componentRunId")


class ComponentInputsPayload(BasePayload):
"""Separate payload for component input values, joined via component_run_id.

This payload supports automatic splitting when URL size exceeds limits:
- If component_inputs causes URL to exceed max_url_size (default 2000 chars),
the payload is split into multiple chunks
- Each chunk includes all fixed fields (component_run_id, component_id, component_name)
for analytics join
- Input fields are distributed across chunks to respect size limits
- Chunks include chunk_index and total_chunks metadata for reassembly
- Single oversized fields are truncated with "...[truncated]" marker

Usage:
payload = ComponentInputsPayload(
component_run_id="run-123",
component_id="comp-456",
component_name="MyComponent",
component_inputs={"input1": "value1", "input2": "value2"}
)
chunks = payload.split_if_needed(max_url_size=2000)
# Returns list of 1+ payloads, all respecting size limit
"""

component_run_id: str = Field(serialization_alias="componentRunId")
component_id: str = Field(serialization_alias="componentId")
component_name: str = Field(serialization_alias="componentName")
component_inputs: dict[str, Any] = Field(serialization_alias="componentInputs")
chunk_index: int | None = Field(None, serialization_alias="chunkIndex")
total_chunks: int | None = Field(None, serialization_alias="totalChunks")

def _calculate_url_size(self, base_url: str = "https://api.scarf.sh/v1/pixel") -> int:
"""Calculate actual encoded URL size using httpx.

Args:
base_url: Base URL for telemetry endpoint (default: Scarf pixel URL)

Returns:
Total character length of the encoded URL including all query parameters
"""
from urllib.parse import urlencode

import orjson

payload_dict = self.model_dump(by_alias=True, exclude_none=True, exclude_unset=True)
# Serialize component_inputs dict to JSON string for URL parameter
if "componentInputs" in payload_dict:
payload_dict["componentInputs"] = orjson.dumps(payload_dict["componentInputs"]).decode("utf-8")
# Construct the URL in-memory instead of creating a full HTTPX Request for speed
query_string = urlencode(payload_dict)
url = f"{base_url}?{query_string}" if query_string else base_url
return len(url)

def _truncate_value_to_fit(self, key: str, value: Any, max_url_size: int) -> Any:
"""Truncate a value using binary search to find max length that fits within max_url_size.

Args:
key: The field key
value: The field value to truncate
max_url_size: Maximum allowed URL size in characters

Returns:
Truncated value with "...[truncated]" suffix
"""
truncation_suffix = "...[truncated]"

# Convert to string if needed (handles both string and non-string values)
# For string values: truncate directly
# For non-string values: convert to string representation, then truncate
str_value = value if isinstance(value, str) else str(value)

# Use binary search to find optimal truncation point
# This finds the maximum prefix length that keeps the URL under max_url_size
max_len = len(str_value)
min_len = 0
truncated_value = str_value[:100] + truncation_suffix # Initial guess

while min_len < max_len:
mid_len = (min_len + max_len + 1) // 2
test_val = str_value[:mid_len] + truncation_suffix
test_inputs = {key: test_val}
test_payload = ComponentInputsPayload(
component_run_id=self.component_run_id,
component_id=self.component_id,
component_name=self.component_name,
component_inputs=test_inputs,
chunk_index=0,
total_chunks=1,
)

if test_payload._calculate_url_size() <= max_url_size:
truncated_value = test_val
min_len = mid_len
else:
max_len = mid_len - 1

return truncated_value

def split_if_needed(self, max_url_size: int = MAX_TELEMETRY_URL_SIZE) -> list["ComponentInputsPayload"]:
"""Split payload into multiple chunks if URL size exceeds max_url_size.

Args:
max_url_size: Maximum allowed URL length in characters (default: MAX_TELEMETRY_URL_SIZE)

Returns:
List of ComponentInputsPayload objects. Single item if no split needed,
multiple items if payload was split across chunks.
"""
from lfx.log.logger import logger

# Calculate current URL size
current_size = self._calculate_url_size()

# If fits within limit, return as-is
if current_size <= max_url_size:
return [self]

# Need to split - check if component_inputs is a dict
if not isinstance(self.component_inputs, dict):
# If not a dict, return as-is (fail-safe)
logger.warning(f"component_inputs is not a dict, cannot split: {type(self.component_inputs)}")
return [self]

if not self.component_inputs:
# Empty inputs, return as-is
return [self]

# Distribute input fields across chunks
chunks_data = []
current_chunk_inputs: dict[str, Any] = {}

for key, value in self.component_inputs.items():
# Calculate size if we add this field to current chunk
test_inputs = {**current_chunk_inputs, key: value}
test_payload = ComponentInputsPayload(
component_run_id=self.component_run_id,
component_id=self.component_id,
component_name=self.component_name,
component_inputs=test_inputs,
chunk_index=0,
total_chunks=1,
)
test_size = test_payload._calculate_url_size()

# If adding this field exceeds limit, start new chunk
if test_size > max_url_size and current_chunk_inputs:
chunks_data.append(current_chunk_inputs)
# Check if the field by itself exceeds the limit
single_field_test = ComponentInputsPayload(
component_run_id=self.component_run_id,
component_id=self.component_id,
component_name=self.component_name,
component_inputs={key: value},
chunk_index=0,
total_chunks=1,
)
if single_field_test._calculate_url_size() > max_url_size:
# Single field is too large - truncate it
logger.warning(f"Truncating oversized field '{key}' in component_inputs")
truncated_value = self._truncate_value_to_fit(key, value, max_url_size)
current_chunk_inputs = {key: truncated_value}
else:
current_chunk_inputs = {key: value}
elif test_size > max_url_size and not current_chunk_inputs:
# Single field is too large - truncate it
logger.warning(f"Truncating oversized field '{key}' in component_inputs")

# Binary search to find max value length that fits
truncated_value = self._truncate_value_to_fit(key, value, max_url_size)
current_chunk_inputs[key] = truncated_value
else:
current_chunk_inputs[key] = value

# Add final chunk
if current_chunk_inputs:
chunks_data.append(current_chunk_inputs)

# Create chunk payloads
total_chunks = len(chunks_data)
result = []

for chunk_index, chunk_inputs in enumerate(chunks_data):
chunk_payload = ComponentInputsPayload(
component_run_id=self.component_run_id,
component_id=self.component_id,
component_name=self.component_name,
component_inputs=chunk_inputs,
chunk_index=chunk_index,
total_chunks=total_chunks,
)
result.append(chunk_payload)

return result


class ExceptionPayload(BasePayload):
Expand Down
15 changes: 15 additions & 0 deletions src/backend/base/langflow/services/telemetry/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
from langflow.services.base import Service
from langflow.services.telemetry.opentelemetry import OpenTelemetry
from langflow.services.telemetry.schema import (
MAX_TELEMETRY_URL_SIZE,
ComponentIndexPayload,
ComponentInputsPayload,
ComponentPayload,
ExceptionPayload,
PlaygroundPayload,
Expand Down Expand Up @@ -146,6 +148,19 @@ async def log_package_playground(self, payload: PlaygroundPayload) -> None:
async def log_package_component(self, payload: ComponentPayload) -> None:
await self._queue_event((self.send_telemetry_data, payload, "component"))

async def log_package_component_inputs(self, payload: ComponentInputsPayload) -> None:
"""Log component input values, splitting into multiple requests if needed.

Args:
payload: Component inputs payload to log
"""
# Split payload if it exceeds URL size limit
chunks = payload.split_if_needed(max_url_size=MAX_TELEMETRY_URL_SIZE)

# Queue each chunk separately
for chunk in chunks:
await self._queue_event((self.send_telemetry_data, chunk, "component_inputs"))

async def log_component_index(self, payload: ComponentIndexPayload) -> None:
await self._queue_event((self.send_telemetry_data, payload, "component_index"))

Expand Down
Loading