Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""Migrate published_flow to flow_version

Revision ID: 20251125000000
Revises: 20251121000000
Create Date: 2025-11-25 00:00:00.000000

This migration copies data from published_flow to flow_version.
For each published flow, we create a corresponding flow_version record with Published status.
"""

from typing import Sequence, Union

from alembic import op
from sqlalchemy import text

# revision identifiers, used by Alembic.
revision: str = "20251125000000"
down_revision: Union[str, None] = "20251121000000"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Migrate published_flow data to flow_version."""
conn = op.get_bind()

# Insert data from published_flow into flow_version
# Use ROW_NUMBER to assign unique version numbers (1.0.0, 1.0.1, 1.0.2, etc.) per original flow
# Extract name from email: "[email protected]" -> "Jagveer", "[email protected]" -> "Sourabh Rai"
conn.execute(text("""
WITH numbered_flows AS (
SELECT
pf.*,
ROW_NUMBER() OVER (
PARTITION BY pf.flow_cloned_from
ORDER BY pf.created_at
) - 1 as version_number
FROM published_flow pf
WHERE pf.flow_cloned_from IS NOT NULL
)
INSERT INTO flow_version (
id,
original_flow_id,
version_flow_id,
status_id,
version,
title,
description,
tags,
agent_logo,
sample_id,
submitted_by,
submitted_by_name,
submitted_by_email,
submitted_at,
reviewed_by,
reviewed_by_name,
reviewed_by_email,
reviewed_at,
rejection_reason,
published_by,
published_by_name,
published_by_email,
published_at,
created_at,
updated_at
)
SELECT
pf.id, -- Use published_flow.id as flow_version.id
pf.flow_cloned_from, -- original_flow_id
pf.flow_id, -- version_flow_id
5, -- status_id = 5 (Published)
CONCAT('1.0.', pf.version_number), -- version: 1.0.0, 1.0.1, 1.0.2, etc.
pf.flow_name, -- title
pf.description, -- description
pf.tags, -- tags
pf.flow_icon, -- agent_logo
NULL, -- sample_id (will be set later)
pf.published_by, -- submitted_by
-- Extract name from email: replace dots with spaces and title case
INITCAP(REPLACE(SPLIT_PART(pf.published_by_username, '@', 1), '.', ' ')), -- submitted_by_name
pf.published_by_username, -- submitted_by_email
pf.created_at, -- submitted_at
pf.published_by, -- reviewed_by
'Rishi kumar', -- reviewed_by_name (hardcoded)
'[email protected]', -- reviewed_by_email (hardcoded)
pf.updated_at, -- reviewed_at
NULL, -- rejection_reason
pf.published_by, -- published_by
-- Extract name from email: replace dots with spaces and title case
INITCAP(REPLACE(SPLIT_PART(pf.published_by_username, '@', 1), '.', ' ')), -- published_by_name
pf.published_by_username, -- published_by_email
pf.updated_at, -- published_at
pf.created_at, -- created_at
pf.updated_at -- updated_at
FROM numbered_flows pf
"""))


def downgrade() -> None:
"""Remove migrated data from flow_version."""
conn = op.get_bind()

# Delete records that were migrated from published_flow
# We identify them by status_id = 5 (Published) and version starting with '1.0.'
conn.execute(text("""
DELETE FROM flow_version
WHERE status_id = 5
AND version LIKE '1.0.%'
AND id IN (
SELECT id FROM published_flow
)
"""))
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Migrate published_flow_input_sample to version_flow_input_sample

Revision ID: 20251125000001
Revises: 20251121000000
Create Date: 2025-11-25 00:00:01.000000

This migration copies data from published_flow_input_sample to version_flow_input_sample.
For each published flow input sample, we create a corresponding version input sample record.
"""

from typing import Sequence, Union

from alembic import op
from sqlalchemy import text

# revision identifiers, used by Alembic.
revision: str = "20251125000001"
down_revision: Union[str, None] = "20251125000000"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Migrate published_flow_input_sample data to version_flow_input_sample."""
conn = op.get_bind()

# Insert data from published_flow_input_sample into version_flow_input_sample
# Join with published_flow to get the original flow_id (flow_cloned_from)
conn.execute(text("""
INSERT INTO version_flow_input_sample (
id,
flow_version_id,
original_flow_id,
version,
storage_account,
container_name,
file_names,
sample_text,
sample_output,
created_at,
updated_at
)
SELECT
gen_random_uuid(), -- Generate new UUID for id
input.published_flow_id, -- flow_version_id from published_flow_input_sample
pf.flow_cloned_from, -- original_flow_id from published_flow
'1.0.0', -- Default version for migrated data
input.storage_account, -- storage_account from published_flow_input_sample
input.container_name, -- container_name from published_flow_input_sample
input.file_names, -- file_names from published_flow_input_sample
input.sample_text, -- sample_text from published_flow_input_sample
input.sample_output, -- sample_output from published_flow_input_sample
NOW(), -- created_at
NOW() -- updated_at
FROM published_flow_input_sample input
INNER JOIN published_flow pf ON input.published_flow_id = pf.id
WHERE pf.flow_cloned_from IS NOT NULL -- Only migrate if we have an original flow reference
"""))


def downgrade() -> None:
"""Remove migrated data from version_flow_input_sample."""
conn = op.get_bind()

# Delete records that were migrated from published_flow_input_sample
# We identify them by version starting with '1.0.' and flow_version_id matching published_flow.id
conn.execute(text("""
DELETE FROM version_flow_input_sample
WHERE version LIKE '1.0.%'
AND flow_version_id IN (
SELECT id FROM published_flow
)
"""))
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Update flow_version sample_id with version_flow_input_sample references

Revision ID: 20251125000002
Revises: 20251125000001
Create Date: 2025-11-25 00:00:02.000000

This migration updates the sample_id field in flow_version table to reference
the corresponding version_flow_input_sample records.
"""

from typing import Sequence, Union

from alembic import op
from sqlalchemy import text

# revision identifiers, used by Alembic.
revision: str = "20251125000002"
down_revision: Union[str, None] = "20251125000001"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Update flow_version.sample_id with corresponding version_flow_input_sample.id."""
conn = op.get_bind()

# Update sample_id in flow_version by matching with version_flow_input_sample
# Match on flow_version.id = version_flow_input_sample.flow_version_id
conn.execute(text("""
UPDATE flow_version fv
SET sample_id = vfis.id
FROM version_flow_input_sample vfis
WHERE fv.id = vfis.flow_version_id
AND fv.status_id = 5 -- Only update Published flows from migration
AND fv.version LIKE '1.0.%' -- Only update migrated records
"""))


def downgrade() -> None:
"""Reset sample_id to NULL for migrated records."""
conn = op.get_bind()

# Reset sample_id to NULL for records that were updated in this migration
conn.execute(text("""
UPDATE flow_version
SET sample_id = NULL
WHERE status_id = 5
AND version LIKE '1.0.%'
AND sample_id IS NOT NULL
"""))
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Update flow_version user information from flow and user tables

Revision ID: 20251125000003
Revises: 20251125000002
Create Date: 2025-11-25 00:00:03.000000

This migration updates user-related fields in flow_version table by joining
with flow and user tables to get accurate user information.
"""

from typing import Sequence, Union

from alembic import op
from sqlalchemy import text

# revision identifiers, used by Alembic.
revision: str = "20251125000003"
down_revision: Union[str, None] = "20251125000002"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Update flow_version user fields with data from flow and user tables."""
conn = op.get_bind()

# Update user-related fields in flow_version by joining with flow and user tables
conn.execute(text("""
UPDATE flow_version fv
SET
submitted_by = u.id,
submitted_by_name = INITCAP(REPLACE(SPLIT_PART(u.username, '@', 1), '.', ' ')),
submitted_by_email = u.username,
published_by = u.id,
published_by_name = INITCAP(REPLACE(SPLIT_PART(u.username, '@', 1), '.', ' ')),
published_by_email = u.username
FROM flow f
INNER JOIN "user" u ON f.user_id = u.id
WHERE fv.original_flow_id = f.id
"""))


def downgrade() -> None:
"""Reset user fields to NULL for all updated records."""
conn = op.get_bind()

# This is a data-only migration, downgrade would set fields back to NULL
# However, since we're updating existing data without filters,
# we cannot reliably restore previous values
# Keeping this as a placeholder - in practice, this migration should not be downgraded
pass