Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
733386b
feat(api/repo): Allow to config repository implementation
laipz8200 Jun 24, 2025
b2b4049
feat: Create a DifyAPIRepositoryFactory to handle workflow node execu…
laipz8200 Jun 27, 2025
b450acf
feat: Create a APIWorkflowRunRepository
laipz8200 Jun 27, 2025
6a11105
test(workflow_service): Fix test
laipz8200 Jun 27, 2025
641cabc
chore: add new configs to env example
laipz8200 Jun 27, 2025
8e71a9a
test(test_workflow_draft_variable_service): Fix tests
laipz8200 Jul 7, 2025
6076240
feat: Integrates execution repository inheritance
laipz8200 Jul 9, 2025
63bbcaf
refactor(token_buffer_memory): Convert ValueError to AssertionError
laipz8200 Jul 10, 2025
cf2b5c7
refactor(workflow_response_converter): Remove unused workflow_executi…
laipz8200 Jul 10, 2025
f585431
refactor(factory): Move import statement to the top
laipz8200 Jul 10, 2025
a0cfdb4
docs(api_workflow_node_execution_repository): Add notice about datat …
laipz8200 Jul 10, 2025
40a7089
refactor(api_workflow_node_execution_repository): Refactors workflow …
laipz8200 Jul 10, 2025
00462a0
refactor(api): Simplify the constructor for WorkflowDraftVariableService
QuantumGhost Jul 10, 2025
115c6e2
chore(api): improve type hints for `DifyCoreRepositoryFactory`
QuantumGhost Jul 10, 2025
00c50ed
test(api): fix tests broken by refactor
QuantumGhost Jul 10, 2025
7a47b86
refactor(remove_app_and_related_data_task): Remove 'expire_on_commit'…
laipz8200 Jul 14, 2025
42a195e
feat(api/repositories/sqlalchemy_api_workflow_run_repository.py): Imp…
laipz8200 Jul 14, 2025
0d1b27f
docs(api/repositories/api_workflow_node_execution_repository.py): Add…
laipz8200 Jul 14, 2025
7cabcea
docs(api/repositories/sqlalchemy_api_workflow_node_execution_reposito…
laipz8200 Jul 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,19 @@ MAX_VARIABLE_SIZE=204800
# hybrid: Save new data to object storage, read from both object storage and RDBMS
WORKFLOW_NODE_EXECUTION_STORAGE=rdbms

# Repository configuration
# Core workflow execution repository implementation
CORE_WORKFLOW_EXECUTION_REPOSITORY=core.repositories.sqlalchemy_workflow_execution_repository.SQLAlchemyWorkflowExecutionRepository

# Core workflow node execution repository implementation
CORE_WORKFLOW_NODE_EXECUTION_REPOSITORY=core.repositories.sqlalchemy_workflow_node_execution_repository.SQLAlchemyWorkflowNodeExecutionRepository

# API workflow node execution repository implementation
API_WORKFLOW_NODE_EXECUTION_REPOSITORY=repositories.sqlalchemy_api_workflow_node_execution_repository.DifyAPISQLAlchemyWorkflowNodeExecutionRepository

# API workflow run repository implementation
API_WORKFLOW_RUN_REPOSITORY=repositories.sqlalchemy_api_workflow_run_repository.DifyAPISQLAlchemyWorkflowRunRepository

# App configuration
APP_MAX_EXECUTION_TIME=1200
APP_MAX_ACTIVE_REQUESTS=0
Expand Down
28 changes: 28 additions & 0 deletions api/configs/feature/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,33 @@ class WorkflowNodeExecutionConfig(BaseSettings):
)


class RepositoryConfig(BaseSettings):
"""
Configuration for repository implementations
"""

CORE_WORKFLOW_EXECUTION_REPOSITORY: str = Field(
description="Repository implementation for WorkflowExecution. Specify as a module path",
default="core.repositories.sqlalchemy_workflow_execution_repository.SQLAlchemyWorkflowExecutionRepository",
)

CORE_WORKFLOW_NODE_EXECUTION_REPOSITORY: str = Field(
description="Repository implementation for WorkflowNodeExecution. Specify as a module path",
default="core.repositories.sqlalchemy_workflow_node_execution_repository.SQLAlchemyWorkflowNodeExecutionRepository",
)

API_WORKFLOW_NODE_EXECUTION_REPOSITORY: str = Field(
description="Service-layer repository implementation for WorkflowNodeExecutionModel operations. "
"Specify as a module path",
default="repositories.sqlalchemy_api_workflow_node_execution_repository.DifyAPISQLAlchemyWorkflowNodeExecutionRepository",
)

API_WORKFLOW_RUN_REPOSITORY: str = Field(
description="Service-layer repository implementation for WorkflowRun operations. Specify as a module path",
default="repositories.sqlalchemy_api_workflow_run_repository.DifyAPISQLAlchemyWorkflowRunRepository",
)


class AuthConfig(BaseSettings):
"""
Configuration for authentication and OAuth
Expand Down Expand Up @@ -903,6 +930,7 @@ class FeatureConfig(
MultiModalTransferConfig,
PositionConfig,
RagEtlConfig,
RepositoryConfig,
SecurityConfig,
ToolConfig,
UpdateConfig,
Expand Down
2 changes: 0 additions & 2 deletions api/controllers/console/app/wraps.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ def decorated_view(*args, **kwargs):
raise AppNotFoundError()

app_mode = AppMode.value_of(app_model.mode)
if app_mode == AppMode.CHANNEL:
raise AppNotFoundError()

if mode is not None:
if isinstance(mode, list):
Expand Down
14 changes: 11 additions & 3 deletions api/controllers/service_api/app/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dateutil.parser import isoparse
from flask_restful import Resource, fields, marshal_with, reqparse
from flask_restful.inputs import int_range
from sqlalchemy.orm import Session
from sqlalchemy.orm import Session, sessionmaker
from werkzeug.exceptions import InternalServerError

from controllers.service_api import api
Expand All @@ -30,7 +30,7 @@
from libs import helper
from libs.helper import TimestampField
from models.model import App, AppMode, EndUser
from models.workflow import WorkflowRun
from repositories.factory import DifyAPIRepositoryFactory
from services.app_generate_service import AppGenerateService
from services.errors.llm import InvokeRateLimitError
from services.workflow_app_service import WorkflowAppService
Expand Down Expand Up @@ -63,7 +63,15 @@ def get(self, app_model: App, workflow_run_id: str):
if app_mode not in [AppMode.WORKFLOW, AppMode.ADVANCED_CHAT]:
raise NotWorkflowAppError()

workflow_run = db.session.query(WorkflowRun).filter(WorkflowRun.id == workflow_run_id).first()
# Use repository to get workflow run
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)

workflow_run = workflow_run_repo.get_workflow_run_by_id(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
run_id=workflow_run_id,
)
return workflow_run


Expand Down
15 changes: 10 additions & 5 deletions api/core/agent/base_agent_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import uuid
from typing import Optional, Union, cast

from sqlalchemy import select

from core.agent.entities import AgentEntity, AgentToolEntity
from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
from core.app.apps.agent_chat.app_config_manager import AgentChatAppConfig
Expand Down Expand Up @@ -417,12 +419,15 @@ def organize_agent_history(self, prompt_messages: list[PromptMessage]) -> list[P
if isinstance(prompt_message, SystemPromptMessage):
result.append(prompt_message)

messages: list[Message] = (
db.session.query(Message)
.filter(
Message.conversation_id == self.message.conversation_id,
messages = (
(
db.session.execute(
select(Message)
.where(Message.conversation_id == self.message.conversation_id)
.order_by(Message.created_at.desc())
)
)
.order_by(Message.created_at.desc())
.scalars()
.all()
)

Expand Down
15 changes: 7 additions & 8 deletions api/core/app/apps/advanced_chat/app_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.ops.ops_trace_manager import TraceQueueManager
from core.prompt.utils.get_thread_messages_length import get_thread_messages_length
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository
from core.repositories import DifyCoreRepositoryFactory
from core.workflow.repositories.draft_variable_repository import (
DraftVariableSaverFactory,
)
Expand Down Expand Up @@ -183,14 +182,14 @@ def generate(
workflow_triggered_from = WorkflowRunTriggeredFrom.DEBUGGING
else:
workflow_triggered_from = WorkflowRunTriggeredFrom.APP_RUN
workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository(
workflow_execution_repository = DifyCoreRepositoryFactory.create_workflow_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
triggered_from=workflow_triggered_from,
)
# Create workflow node execution repository
workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
workflow_node_execution_repository = DifyCoreRepositoryFactory.create_workflow_node_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
Expand Down Expand Up @@ -260,14 +259,14 @@ def single_iteration_generate(
# Create session factory
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
# Create workflow execution(aka workflow run) repository
workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository(
workflow_execution_repository = DifyCoreRepositoryFactory.create_workflow_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
)
# Create workflow node execution repository
workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
workflow_node_execution_repository = DifyCoreRepositoryFactory.create_workflow_node_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
Expand Down Expand Up @@ -343,14 +342,14 @@ def single_loop_generate(
# Create session factory
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
# Create workflow execution(aka workflow run) repository
workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository(
workflow_execution_repository = DifyCoreRepositoryFactory.create_workflow_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
)
# Create workflow node execution repository
workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
workflow_node_execution_repository = DifyCoreRepositoryFactory.create_workflow_node_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
Expand Down
19 changes: 7 additions & 12 deletions api/core/app/apps/workflow/app_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
from core.app.entities.task_entities import WorkflowAppBlockingResponse, WorkflowAppStreamResponse
from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.ops.ops_trace_manager import TraceQueueManager
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository
from core.repositories import DifyCoreRepositoryFactory
from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
Expand Down Expand Up @@ -156,14 +155,14 @@ def generate(
workflow_triggered_from = WorkflowRunTriggeredFrom.DEBUGGING
else:
workflow_triggered_from = WorkflowRunTriggeredFrom.APP_RUN
workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository(
workflow_execution_repository = DifyCoreRepositoryFactory.create_workflow_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
triggered_from=workflow_triggered_from,
)
# Create workflow node execution repository
workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
workflow_node_execution_repository = DifyCoreRepositoryFactory.create_workflow_node_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
Expand Down Expand Up @@ -306,16 +305,14 @@ def single_iteration_generate(
# Create session factory
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
# Create workflow execution(aka workflow run) repository
workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository(
workflow_execution_repository = DifyCoreRepositoryFactory.create_workflow_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
)
# Create workflow node execution repository
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)

workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
workflow_node_execution_repository = DifyCoreRepositoryFactory.create_workflow_node_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
Expand Down Expand Up @@ -390,16 +387,14 @@ def single_loop_generate(
# Create session factory
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
# Create workflow execution(aka workflow run) repository
workflow_execution_repository = SQLAlchemyWorkflowExecutionRepository(
workflow_execution_repository = DifyCoreRepositoryFactory.create_workflow_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
)
# Create workflow node execution repository
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)

workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
workflow_node_execution_repository = DifyCoreRepositoryFactory.create_workflow_node_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
Expand Down
12 changes: 4 additions & 8 deletions api/core/app/apps/workflow/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from collections.abc import Generator
from typing import Optional, Union

from sqlalchemy import select
from sqlalchemy.orm import Session

from constants.tts_auto_play_timeout import TTS_AUTO_PLAY_TIMEOUT, TTS_AUTO_PLAY_YIELD_CPU_TIME
Expand Down Expand Up @@ -68,7 +67,6 @@
Workflow,
WorkflowAppLog,
WorkflowAppLogCreatedFrom,
WorkflowRun,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -562,8 +560,6 @@ def _process_stream_response(
tts_publisher.publish(None)

def _save_workflow_app_log(self, *, session: Session, workflow_execution: WorkflowExecution) -> None:
workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id_))
assert workflow_run is not None
invoke_from = self._application_generate_entity.invoke_from
if invoke_from == InvokeFrom.SERVICE_API:
created_from = WorkflowAppLogCreatedFrom.SERVICE_API
Expand All @@ -576,10 +572,10 @@ def _save_workflow_app_log(self, *, session: Session, workflow_execution: Workfl
return

workflow_app_log = WorkflowAppLog()
workflow_app_log.tenant_id = workflow_run.tenant_id
workflow_app_log.app_id = workflow_run.app_id
workflow_app_log.workflow_id = workflow_run.workflow_id
workflow_app_log.workflow_run_id = workflow_run.id
workflow_app_log.tenant_id = self._application_generate_entity.app_config.tenant_id
workflow_app_log.app_id = self._application_generate_entity.app_config.app_id
workflow_app_log.workflow_id = workflow_execution.workflow_id
workflow_app_log.workflow_run_id = workflow_execution.id_
workflow_app_log.created_from = created_from.value
workflow_app_log.created_by_role = self._created_by_role
workflow_app_log.created_by = self._user_id
Expand Down
52 changes: 25 additions & 27 deletions api/core/memory/token_buffer_memory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from collections.abc import Sequence
from typing import Optional

from sqlalchemy import select

from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
from core.file import file_manager
from core.model_manager import ModelInstance
Expand All @@ -17,11 +19,15 @@
from extensions.ext_database import db
from factories import file_factory
from models.model import AppMode, Conversation, Message, MessageFile
from models.workflow import WorkflowRun
from models.workflow import Workflow, WorkflowRun


class TokenBufferMemory:
def __init__(self, conversation: Conversation, model_instance: ModelInstance) -> None:
def __init__(
self,
conversation: Conversation,
model_instance: ModelInstance,
) -> None:
self.conversation = conversation
self.model_instance = model_instance

Expand All @@ -36,28 +42,18 @@ def get_history_prompt_messages(
app_record = self.conversation.app

# fetch limited messages, and return reversed
query = (
db.session.query(
Message.id,
Message.query,
Message.answer,
Message.created_at,
Message.workflow_run_id,
Message.parent_message_id,
Message.answer_tokens,
)
.filter(
Message.conversation_id == self.conversation.id,
)
.order_by(Message.created_at.desc())
stmt = (
select(Message).where(Message.conversation_id == self.conversation.id).order_by(Message.created_at.desc())
)

if message_limit and message_limit > 0:
message_limit = min(message_limit, 500)
else:
message_limit = 500

messages = query.limit(message_limit).all()
stmt = stmt.limit(message_limit)

messages = db.session.scalars(stmt).all()

# instead of all messages from the conversation, we only need to extract messages
# that belong to the thread of last message
Expand All @@ -74,18 +70,20 @@ def get_history_prompt_messages(
files = db.session.query(MessageFile).filter(MessageFile.message_id == message.id).all()
if files:
file_extra_config = None
if self.conversation.mode not in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:
if self.conversation.mode in {AppMode.AGENT_CHAT, AppMode.COMPLETION, AppMode.CHAT}:
file_extra_config = FileUploadConfigManager.convert(self.conversation.model_config)
elif self.conversation.mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:
workflow_run = db.session.scalar(
select(WorkflowRun).where(WorkflowRun.id == message.workflow_run_id)
)
if not workflow_run:
raise ValueError(f"Workflow run not found: {message.workflow_run_id}")
workflow = db.session.scalar(select(Workflow).where(Workflow.id == workflow_run.workflow_id))
if not workflow:
raise ValueError(f"Workflow not found: {workflow_run.workflow_id}")
file_extra_config = FileUploadConfigManager.convert(workflow.features_dict, is_vision=False)
else:
if message.workflow_run_id:
workflow_run = (
db.session.query(WorkflowRun).filter(WorkflowRun.id == message.workflow_run_id).first()
)

if workflow_run and workflow_run.workflow:
file_extra_config = FileUploadConfigManager.convert(
workflow_run.workflow.features_dict, is_vision=False
)
raise AssertionError(f"Invalid app mode: {self.conversation.mode}")

detail = ImagePromptMessageContent.DETAIL.LOW
if file_extra_config and app_record:
Expand Down
Loading