Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/controllers/service_api/app/completion.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

from flask import request
from flask_restful import Resource, reqparse
from werkzeug.exceptions import InternalServerError, NotFound

Expand All @@ -23,6 +24,7 @@
ProviderTokenNotInitError,
QuotaExceededError,
)
from core.helper.trace_id_helper import get_external_trace_id
from core.model_runtime.errors.invoke import InvokeError
from libs import helper
from libs.helper import uuid_value
Expand Down Expand Up @@ -111,6 +113,10 @@ def post(self, app_model: App, end_user: EndUser):

args = parser.parse_args()

external_trace_id = get_external_trace_id(request)
if external_trace_id:
args["external_trace_id"] = external_trace_id

streaming = args["response_mode"] == "streaming"

try:
Expand Down
6 changes: 5 additions & 1 deletion api/controllers/service_api/app/workflow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from dateutil.parser import isoparse
from flask import request
from flask_restful import Resource, fields, marshal_with, reqparse
from flask_restful.inputs import int_range
from sqlalchemy.orm import Session, sessionmaker
Expand All @@ -23,6 +24,7 @@
ProviderTokenNotInitError,
QuotaExceededError,
)
from core.helper.trace_id_helper import get_external_trace_id
from core.model_runtime.errors.invoke import InvokeError
from core.workflow.entities.workflow_execution import WorkflowExecutionStatus
from extensions.ext_database import db
Expand Down Expand Up @@ -90,7 +92,9 @@ def post(self, app_model: App, end_user: EndUser):
parser.add_argument("files", type=list, required=False, location="json")
parser.add_argument("response_mode", type=str, choices=["blocking", "streaming"], location="json")
args = parser.parse_args()

external_trace_id = get_external_trace_id(request)
if external_trace_id:
args["external_trace_id"] = external_trace_id
streaming = args.get("response_mode") == "streaming"

try:
Expand Down
6 changes: 5 additions & 1 deletion api/core/app/apps/advanced_chat/app_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom
from core.app.entities.task_entities import ChatbotAppBlockingResponse, ChatbotAppStreamResponse
from core.helper.trace_id_helper import extract_external_trace_id_from_args
from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.ops.ops_trace_manager import TraceQueueManager
from core.prompt.utils.get_thread_messages_length import get_thread_messages_length
Expand Down Expand Up @@ -112,7 +113,10 @@ def generate(
query = query.replace("\x00", "")
inputs = args["inputs"]

extras = {"auto_generate_conversation_name": args.get("auto_generate_name", False)}
extras = {
"auto_generate_conversation_name": args.get("auto_generate_name", False),
**extract_external_trace_id_from_args(args),
}

# get conversation
conversation = None
Expand Down
4 changes: 4 additions & 0 deletions api/core/app/apps/advanced_chat/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ def _handle_workflow_succeeded_event(
outputs=event.outputs,
conversation_id=self._conversation_id,
trace_manager=trace_manager,
external_trace_id=self._application_generate_entity.extras.get("external_trace_id"),
)
workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response(
session=session,
Expand Down Expand Up @@ -590,6 +591,7 @@ def _handle_workflow_partial_success_event(
exceptions_count=event.exceptions_count,
conversation_id=None,
trace_manager=trace_manager,
external_trace_id=self._application_generate_entity.extras.get("external_trace_id"),
)
workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response(
session=session,
Expand Down Expand Up @@ -622,6 +624,7 @@ def _handle_workflow_failed_event(
conversation_id=self._conversation_id,
trace_manager=trace_manager,
exceptions_count=event.exceptions_count,
external_trace_id=self._application_generate_entity.extras.get("external_trace_id"),
)
workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response(
session=session,
Expand Down Expand Up @@ -653,6 +656,7 @@ def _handle_stop_event(
error_message=event.get_stop_reason(),
conversation_id=self._conversation_id,
trace_manager=trace_manager,
external_trace_id=self._application_generate_entity.extras.get("external_trace_id"),
)
workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response(
session=session,
Expand Down
6 changes: 6 additions & 0 deletions api/core/app/apps/workflow/app_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from core.app.apps.workflow.generate_task_pipeline import WorkflowAppGenerateTaskPipeline
from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity
from core.app.entities.task_entities import WorkflowAppBlockingResponse, WorkflowAppStreamResponse
from core.helper.trace_id_helper import extract_external_trace_id_from_args
from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.ops.ops_trace_manager import TraceQueueManager
from core.repositories import DifyCoreRepositoryFactory
Expand Down Expand Up @@ -123,6 +124,10 @@ def generate(
)

inputs: Mapping[str, Any] = args["inputs"]

extras = {
**extract_external_trace_id_from_args(args),
}
workflow_run_id = str(uuid.uuid4())
# init application generate entity
application_generate_entity = WorkflowAppGenerateEntity(
Expand All @@ -142,6 +147,7 @@ def generate(
call_depth=call_depth,
trace_manager=trace_manager,
workflow_execution_id=workflow_run_id,
extras=extras,
)

contexts.plugin_tool_providers.set({})
Expand Down
3 changes: 3 additions & 0 deletions api/core/app/apps/workflow/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ def _handle_workflow_succeeded_event(
outputs=event.outputs,
conversation_id=None,
trace_manager=trace_manager,
external_trace_id=self._application_generate_entity.extras.get("external_trace_id"),
)

# save workflow app log
Expand Down Expand Up @@ -524,6 +525,7 @@ def _handle_workflow_partial_success_event(
exceptions_count=event.exceptions_count,
conversation_id=None,
trace_manager=trace_manager,
external_trace_id=self._application_generate_entity.extras.get("external_trace_id"),
)

# save workflow app log
Expand Down Expand Up @@ -561,6 +563,7 @@ def _handle_workflow_failed_and_stop_events(
conversation_id=None,
trace_manager=trace_manager,
exceptions_count=event.exceptions_count if isinstance(event, QueueWorkflowFailedEvent) else 0,
external_trace_id=self._application_generate_entity.extras.get("external_trace_id"),
)

# save workflow app log
Expand Down
42 changes: 42 additions & 0 deletions api/core/helper/trace_id_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import re
from collections.abc import Mapping
from typing import Any, Optional


def is_valid_trace_id(trace_id: str) -> bool:
"""
Check if the trace_id is valid.

Requirements: 1-128 characters, only letters, numbers, '-', and '_'.
"""
return bool(re.match(r"^[a-zA-Z0-9\-_]{1,128}$", trace_id))


def get_external_trace_id(request: Any) -> Optional[str]:
"""
Retrieve the trace_id from the request.

Priority: header ('X-Trace-Id'), then parameters, then JSON body. Returns None if not provided or invalid.
"""
trace_id = request.headers.get("X-Trace-Id")
if not trace_id:
trace_id = request.args.get("trace_id")
if not trace_id and getattr(request, "is_json", False):
json_data = getattr(request, "json", None)
if json_data:
trace_id = json_data.get("trace_id")
if isinstance(trace_id, str) and is_valid_trace_id(trace_id):
return trace_id
return None


def extract_external_trace_id_from_args(args: Mapping[str, Any]) -> dict:
"""
Extract 'external_trace_id' from args.

Returns a dict suitable for use in extras. Returns an empty dict if not found.
"""
trace_id = args.get("external_trace_id")
if trace_id:
return {"external_trace_id": trace_id}
return {}
3 changes: 2 additions & 1 deletion api/core/ops/aliyun_trace/aliyun_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ def get_project_url(self):
raise ValueError(f"Aliyun get run url failed: {str(e)}")

def workflow_trace(self, trace_info: WorkflowTraceInfo):
trace_id = convert_to_trace_id(trace_info.workflow_run_id)
external_trace_id = trace_info.metadata.get("external_trace_id")
trace_id = external_trace_id or convert_to_trace_id(trace_info.workflow_run_id)
workflow_span_id = convert_to_span_id(trace_info.workflow_run_id, "workflow")
self.add_workflow_span(trace_id, workflow_span_id, trace_info)

Expand Down
3 changes: 2 additions & 1 deletion api/core/ops/arize_phoenix_trace/arize_phoenix_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ def workflow_trace(self, trace_info: WorkflowTraceInfo):
}
workflow_metadata.update(trace_info.metadata)

trace_id = uuid_to_trace_id(trace_info.workflow_run_id)
external_trace_id = trace_info.metadata.get("external_trace_id")
trace_id = external_trace_id or uuid_to_trace_id(trace_info.workflow_run_id)
span_id = RandomIdGenerator().generate_span_id()
context = SpanContext(
trace_id=trace_id,
Expand Down
5 changes: 3 additions & 2 deletions api/core/ops/langfuse_trace/langfuse_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,14 @@ def trace(self, trace_info: BaseTraceInfo):
self.generate_name_trace(trace_info)

def workflow_trace(self, trace_info: WorkflowTraceInfo):
trace_id = trace_info.workflow_run_id
external_trace_id = trace_info.metadata.get("external_trace_id")
trace_id = external_trace_id or trace_info.workflow_run_id
user_id = trace_info.metadata.get("user_id")
metadata = trace_info.metadata
metadata["workflow_app_log_id"] = trace_info.workflow_app_log_id

if trace_info.message_id:
trace_id = trace_info.message_id
trace_id = external_trace_id or trace_info.message_id
name = TraceTaskName.MESSAGE_TRACE.value
trace_data = LangfuseTrace(
id=trace_id,
Expand Down
3 changes: 2 additions & 1 deletion api/core/ops/langsmith_trace/langsmith_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ def trace(self, trace_info: BaseTraceInfo):
self.generate_name_trace(trace_info)

def workflow_trace(self, trace_info: WorkflowTraceInfo):
trace_id = trace_info.message_id or trace_info.workflow_run_id
external_trace_id = trace_info.metadata.get("external_trace_id")
trace_id = external_trace_id or trace_info.message_id or trace_info.workflow_run_id
if trace_info.start_time is None:
trace_info.start_time = datetime.now()
message_dotted_order = (
Expand Down
5 changes: 3 additions & 2 deletions api/core/ops/opik_trace/opik_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,16 @@ def trace(self, trace_info: BaseTraceInfo):
self.generate_name_trace(trace_info)

def workflow_trace(self, trace_info: WorkflowTraceInfo):
dify_trace_id = trace_info.workflow_run_id
external_trace_id = trace_info.metadata.get("external_trace_id")
dify_trace_id = external_trace_id or trace_info.workflow_run_id
opik_trace_id = prepare_opik_uuid(trace_info.start_time, dify_trace_id)
workflow_metadata = wrap_metadata(
trace_info.metadata, message_id=trace_info.message_id, workflow_app_log_id=trace_info.workflow_app_log_id
)
root_span_id = None

if trace_info.message_id:
dify_trace_id = trace_info.message_id
dify_trace_id = external_trace_id or trace_info.message_id
opik_trace_id = prepare_opik_uuid(trace_info.start_time, dify_trace_id)

trace_data = {
Expand Down
4 changes: 4 additions & 0 deletions api/core/ops/ops_trace_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,10 @@ def workflow_trace(
"app_id": workflow_run.app_id,
}

external_trace_id = self.kwargs.get("external_trace_id")
if external_trace_id:
metadata["external_trace_id"] = external_trace_id

workflow_trace_info = WorkflowTraceInfo(
workflow_data=workflow_run.to_dict(),
conversation_id=conversation_id,
Expand Down
3 changes: 2 additions & 1 deletion api/core/ops/weave_trace/weave_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ def trace(self, trace_info: BaseTraceInfo):
self.generate_name_trace(trace_info)

def workflow_trace(self, trace_info: WorkflowTraceInfo):
trace_id = trace_info.message_id or trace_info.workflow_run_id
external_trace_id = trace_info.metadata.get("external_trace_id")
trace_id = external_trace_id or trace_info.message_id or trace_info.workflow_run_id
if trace_info.start_time is None:
trace_info.start_time = datetime.now()

Expand Down
11 changes: 8 additions & 3 deletions api/core/workflow/workflow_cycle_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def handle_workflow_run_success(
outputs: Mapping[str, Any] | None = None,
conversation_id: Optional[str] = None,
trace_manager: Optional[TraceQueueManager] = None,
external_trace_id: Optional[str] = None,
) -> WorkflowExecution:
workflow_execution = self._get_workflow_execution_or_raise_error(workflow_run_id)

Expand All @@ -96,7 +97,7 @@ def handle_workflow_run_success(
total_steps=total_steps,
)

self._add_trace_task_if_needed(trace_manager, workflow_execution, conversation_id)
self._add_trace_task_if_needed(trace_manager, workflow_execution, conversation_id, external_trace_id)

self._workflow_execution_repository.save(workflow_execution)
return workflow_execution
Expand All @@ -111,6 +112,7 @@ def handle_workflow_run_partial_success(
exceptions_count: int = 0,
conversation_id: Optional[str] = None,
trace_manager: Optional[TraceQueueManager] = None,
external_trace_id: Optional[str] = None,
) -> WorkflowExecution:
execution = self._get_workflow_execution_or_raise_error(workflow_run_id)

Expand All @@ -123,7 +125,7 @@ def handle_workflow_run_partial_success(
exceptions_count=exceptions_count,
)

self._add_trace_task_if_needed(trace_manager, execution, conversation_id)
self._add_trace_task_if_needed(trace_manager, execution, conversation_id, external_trace_id)

self._workflow_execution_repository.save(execution)
return execution
Expand All @@ -139,6 +141,7 @@ def handle_workflow_run_failed(
conversation_id: Optional[str] = None,
trace_manager: Optional[TraceQueueManager] = None,
exceptions_count: int = 0,
external_trace_id: Optional[str] = None,
) -> WorkflowExecution:
workflow_execution = self._get_workflow_execution_or_raise_error(workflow_run_id)
now = naive_utc_now()
Expand All @@ -154,7 +157,7 @@ def handle_workflow_run_failed(
)

self._fail_running_node_executions(workflow_execution.id_, error_message, now)
self._add_trace_task_if_needed(trace_manager, workflow_execution, conversation_id)
self._add_trace_task_if_needed(trace_manager, workflow_execution, conversation_id, external_trace_id)

self._workflow_execution_repository.save(workflow_execution)
return workflow_execution
Expand Down Expand Up @@ -312,6 +315,7 @@ def _add_trace_task_if_needed(
trace_manager: Optional[TraceQueueManager],
workflow_execution: WorkflowExecution,
conversation_id: Optional[str],
external_trace_id: Optional[str],
) -> None:
"""Add trace task if trace manager is provided."""
if trace_manager:
Expand All @@ -321,6 +325,7 @@ def _add_trace_task_if_needed(
workflow_execution=workflow_execution,
conversation_id=conversation_id,
user_id=trace_manager.user_id,
external_trace_id=external_trace_id,
)
)

Expand Down
Loading
Loading