Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sqlmesh/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ def diff(ctx, environment: t.Optional[str] = None) -> None:
is_flag=True,
help="Skip the backfill step.",
)
@click.option(
"--forward-only",
is_flag=True,
help="Create a plan for forward-only changes.",
)
@click.option(
"--no-prompts",
is_flag=True,
Expand Down
85 changes: 24 additions & 61 deletions sqlmesh/core/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from rich.syntax import Syntax
from rich.tree import Tree

from sqlmesh.core import constants as c
from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory
from sqlmesh.core.test import ModelTest
from sqlmesh.utils import rich as srich
Expand All @@ -31,7 +30,7 @@
SNAPSHOT_CHANGE_CATEGORY_STR = {
SnapshotChangeCategory.BREAKING: "Breaking",
SnapshotChangeCategory.NON_BREAKING: "Non-breaking",
SnapshotChangeCategory.NO_CHANGE: "No change",
SnapshotChangeCategory.FORWARD_ONLY: "Forward-only",
}


Expand Down Expand Up @@ -231,9 +230,7 @@ def plan(self, plan: Plan, auto_apply: bool) -> None:
plan: The plan to make choices for.
auto_apply: Whether to automatically apply the plan after all choices have been made.
"""
unbounded_end = (
plan.context_diff.environment == c.PROD and plan.is_unbounded_end
)
unbounded_end = not plan.is_dev and plan.is_unbounded_end
self._prompt_categorize(plan, auto_apply)
self._show_options_after_categorization(
plan, auto_apply, unbounded_end=unbounded_end
Expand Down Expand Up @@ -379,29 +376,16 @@ def loading_stop(self, id: uuid.UUID) -> None:
def _get_snapshot_change_category(
self, snapshot: Snapshot, plan: Plan, auto_apply: bool
) -> None:
if plan.indirectly_modified[snapshot.name]:
choices = self._snapshot_change_choices(snapshot)
response = Prompt.ask(
"\n".join(
[f"[{i+1}] {choice}" for i, choice in enumerate(choices.values())]
),
console=self.console,
show_choices=False,
choices=[f"{i+1}" for i in range(len(choices))],
)
choice = list(choices)[int(response) - 1]
elif not snapshot.is_materialized:
choice = (
SnapshotChangeCategory.BREAKING
if Confirm.ask(
f"Does this change require a backfill of [direct]{snapshot.name}[/direct]?",
console=self.console,
)
else SnapshotChangeCategory.NO_CHANGE
)
else:
choice = SnapshotChangeCategory.NON_BREAKING

choices = self._snapshot_change_choices(snapshot)
response = Prompt.ask(
"\n".join(
[f"[{i+1}] {choice}" for i, choice in enumerate(choices.values())]
),
console=self.console,
show_choices=False,
choices=[f"{i+1}" for i in range(len(choices))],
)
choice = list(choices)[int(response) - 1]
plan.set_choice(snapshot, choice)

def _snapshot_change_choices(
Expand All @@ -421,13 +405,12 @@ def _snapshot_change_choices(
elif snapshot.is_embedded_kind:
choices = {
SnapshotChangeCategory.BREAKING: f"Backfill {indirect}",
SnapshotChangeCategory.NO_CHANGE: f"Don't backfill {indirect}",
SnapshotChangeCategory.NON_BREAKING: f"Don't backfill {indirect}",
}
else:
choices = {
SnapshotChangeCategory.BREAKING: f"Backfill {direct} and {indirect}",
SnapshotChangeCategory.NON_BREAKING: f"Backfill {direct} but not {indirect}",
SnapshotChangeCategory.NO_CHANGE: f"Don't backfill {direct} or {indirect}",
}
labeled_choices = {
k: f"[{SNAPSHOT_CHANGE_CATEGORY_STR[k]}] {v}" for k, v in choices.items()
Expand Down Expand Up @@ -561,7 +544,7 @@ def unbounded_end_callback(change):

unbounded_end_date_widget = (
[_checkbox("Unbounded End Date", unbounded_end, unbounded_end_callback)]
if plan.environment.name == c.PROD
if not plan.is_dev
else []
)

Expand Down Expand Up @@ -618,41 +601,21 @@ def radio_button_selected(change):
plan.set_choice(snapshot, choices[change["owner"].index])
self._show_options_after_categorization(plan)

if plan.indirectly_modified[snapshot.name]:
choice_mapping = self._snapshot_change_choices(
snapshot, use_rich_formatting=False
)
radio = widgets.RadioButtons(
options=choice_mapping.values(),
layout={"width": "max-content"},
disabled=False,
)
else:
if snapshot.is_view_kind or snapshot.is_embedded_kind:
choice_mapping = {
SnapshotChangeCategory.NON_BREAKING: f"Update {snapshot.name}"
}
else:
choice_mapping = {
SnapshotChangeCategory.NON_BREAKING: f"Backfill {snapshot.name}",
SnapshotChangeCategory.NO_CHANGE: f"Don't backfill {snapshot.name}",
}
choice_mapping = {
k: f"[{SNAPSHOT_CHANGE_CATEGORY_STR[k]}] {v}"
for k, v in choice_mapping.items()
}
radio = widgets.RadioButtons(
options=choice_mapping.values(),
layout={"width": "max-content"},
disables=False,
)

choice_mapping = self._snapshot_change_choices(
snapshot, use_rich_formatting=False
)
choices = list(choice_mapping)
plan.set_choice(snapshot, choices[0])

radio = widgets.RadioButtons(
options=choice_mapping.values(),
layout={"width": "max-content"},
disabled=False,
)
radio.observe(
radio_button_selected,
"value",
)
plan.set_choice(snapshot, choices[0])
self.display(radio)

def log_test_results(
Expand Down
11 changes: 5 additions & 6 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
from sqlmesh.core.user import User
from sqlmesh.utils import UniqueKeyDict, sys_path
from sqlmesh.utils.dag import DAG
from sqlmesh.utils.date import TimeLike, now, yesterday_ds
from sqlmesh.utils.date import TimeLike, yesterday_ds
from sqlmesh.utils.errors import (
ConfigError,
MissingDependencyError,
Expand Down Expand Up @@ -572,6 +572,7 @@ def plan(
restate_from: t.Optional[t.Iterable[str]] = None,
no_gaps: bool = False,
skip_backfill: bool = False,
forward_only: bool = False,
no_prompts: bool = False,
auto_apply: bool = False,
) -> Plan:
Expand All @@ -595,6 +596,7 @@ def plan(
part of the target environment have no data gaps when compared against previous
snapshots for same models.
skip_backfill: Whether to skip the backfill step. Default: False.
forward_only: Whether the purpose of the plan is to make forward only changes.
no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that
if this flag is set to true and there are uncategorized changes the plan creation will
fail. Default: False.
Expand Down Expand Up @@ -637,13 +639,10 @@ def plan(
restate_from=restate_from,
no_gaps=no_gaps,
skip_backfill=skip_backfill,
is_dev=environment != c.PROD,
forward_only=forward_only,
)

if environment != c.PROD and not end:
# Set default end after plan creation to make sure the prompt for the end date
# still shows up.
plan.end = now()

if not no_prompts:
self.console.plan(plan, auto_apply)
elif auto_apply:
Expand Down
63 changes: 54 additions & 9 deletions sqlmesh/core/plan/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from sqlmesh.utils import random_id
from sqlmesh.utils.dag import DAG
from sqlmesh.utils.date import TimeLike, make_inclusive, now, to_ds, validate_date_range
from sqlmesh.utils.errors import SQLMeshError
from sqlmesh.utils.errors import PlanError, SQLMeshError
from sqlmesh.utils.pydantic import PydanticModel

SnapshotMapping = t.Dict[str, t.Set[str]]
Expand All @@ -39,6 +39,8 @@ class Plan:
part of the target environment have no data gaps when compared against previous
snapshots for same models.
skip_backfill: Whether to skip the backfill step.
is_dev: Whether this plan is for development purposes.
forward_only: Whether the purpose of the plan is to make forward only changes.
"""

def __init__(
Expand All @@ -52,6 +54,8 @@ def __init__(
restate_from: t.Optional[t.Iterable[str]] = None,
no_gaps: bool = False,
skip_backfill: bool = False,
is_dev: bool = False,
forward_only: bool = False,
):
self.context_diff = context_diff
self.override_start = start is not None
Expand All @@ -60,8 +64,10 @@ def __init__(
self.restatements = set()
self.no_gaps = no_gaps
self.skip_backfill = skip_backfill
self.is_dev = is_dev
self.forward_only = forward_only
self._start = start
self._end = end
self._end = end if end or not is_dev else now()
self._apply = apply
self._dag = dag
self._state_reader = state_reader
Expand Down Expand Up @@ -205,9 +211,12 @@ def set_choice(self, snapshot: Snapshot, choice: SnapshotChangeCategory) -> None
"""Sets a snapshot version based on the user choice.

Args:
snapshot: The snapshot to version.
choice: The user decision on how to version the snapshot and it's children.
snapshot: The target snapshot.
choice: The user decision on how to version the target snapshot and its children.
"""
if self.forward_only:
raise PlanError("Choice setting is not supported by a forward-only plan.")

snapshot.change_category = choice
if choice in (
SnapshotChangeCategory.BREAKING,
Expand Down Expand Up @@ -247,20 +256,22 @@ def snapshot_change_category(self, snapshot: Snapshot) -> SnapshotChangeCategory
"""
if snapshot not in self.snapshots:
raise SQLMeshError(
f"Snapshot {snapshot.snapshot_id} does not exist in this plan"
f"Snapshot {snapshot.snapshot_id} does not exist in this plan."
)

if not snapshot.version:
raise SQLMeshError(
f"Snapshot {snapshot.snapshot_id} has not be categorized yet"
f"Snapshot {snapshot.snapshot_id} has not be categorized yet."
)

if snapshot.name not in self.context_diff.modified_snapshots:
return SnapshotChangeCategory.NO_CHANGE
raise SQLMeshError(
f"Snapshot {snapshot.snapshot_id} has not been modified."
)

current, previous = self.context_diff.modified_snapshots[snapshot.name]
if current.version == previous.version:
return SnapshotChangeCategory.NO_CHANGE
return SnapshotChangeCategory.FORWARD_ONLY

if current.data_hash_matches(previous):
return SnapshotChangeCategory.BREAKING
Expand Down Expand Up @@ -303,8 +314,19 @@ def _categorize_snapshots(self) -> t.Tuple[t.List[Snapshot], SnapshotMapping]:
snapshot = self.context_diff.snapshots[model_name]

if model_name in self.context_diff.modified_snapshots:
if self.forward_only:
# In case of the forward only plan any modifications result in reuse of the
# previous version.
snapshot.set_version(snapshot.previous_version)

upstream_model_names = self._dag.upstream(model_name)

if self.context_diff.directly_modified(model_name):
added_and_directly_modified.append(snapshot)
if not self.forward_only:
self._ensure_no_paused_forward_only_upstream(
model_name, upstream_model_names
)
else:
all_indirectly_modified.add(model_name)

Expand All @@ -314,11 +336,16 @@ def _categorize_snapshots(self) -> t.Tuple[t.List[Snapshot], SnapshotMapping]:
if not snapshot.version and not any(
self.context_diff.directly_modified(upstream)
and not self.context_diff.snapshots[upstream].version
for upstream in self._dag.upstream(model_name)
for upstream in upstream_model_names
):
snapshot.set_version()

elif model_name in self.context_diff.added:
if self.forward_only:
raise PlanError(
"New models can't be added as part of the forward-only plan."
)

snapshot.set_version()
added_and_directly_modified.append(snapshot)

Expand All @@ -334,6 +361,24 @@ def _categorize_snapshots(self) -> t.Tuple[t.List[Snapshot], SnapshotMapping]:
indirectly_modified,
)

def _ensure_no_paused_forward_only_upstream(
self, model_name: str, upstream_model_names: t.Iterable[str]
) -> None:
for upstream in upstream_model_names:
upstream_snapshot = self.context_diff.snapshots[upstream]
if (
upstream_snapshot.version
and upstream_snapshot.is_forward_only
and upstream_snapshot.is_paused
):
raise PlanError(
f"Modified model '{model_name}' depends on a paused version of model '{upstream}'. "
"Possible remedies: "
"1) make sure your codebase is up-to-date; "
f"2) promote the current version of model '{upstream}' in the production environment; "
"3) recreate this plan in a forward-only mode."
)


class PlanStatus(str, Enum):
STARTED = "started"
Expand Down
11 changes: 3 additions & 8 deletions sqlmesh/core/plan/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import abc
import typing as t

from sqlmesh.core import constants as c
from sqlmesh.core._typing import NotificationTarget
from sqlmesh.core.console import Console, get_console
from sqlmesh.core.plan.definition import Plan
Expand Down Expand Up @@ -45,10 +44,6 @@ def evaluate(self, plan: Plan) -> None:
plan: The plan to evaluate.
"""

def _is_dev_plan(self, plan: Plan) -> bool:
"""Returns True if the given plan is for development purposes."""
return plan.environment.name != c.PROD


class BuiltInPlanEvaluator(PlanEvaluator):
def __init__(
Expand Down Expand Up @@ -85,7 +80,7 @@ def evaluate(self, plan: Plan) -> None:
max_workers=self.backfill_concurrent_tasks,
console=self.console,
)
scheduler.run(plan.start, plan.end, is_dev=self._is_dev_plan(plan))
scheduler.run(plan.start, plan.end, is_dev=plan.is_dev)

self._promote(plan)

Expand Down Expand Up @@ -132,7 +127,7 @@ def _promote(self, plan: Plan) -> None:
self.snapshot_evaluator.promote(
added,
environment=environment.name,
is_dev=self._is_dev_plan(plan),
is_dev=plan.is_dev,
)
self.snapshot_evaluator.demote(
removed,
Expand Down Expand Up @@ -181,7 +176,7 @@ def evaluate(self, plan: Plan) -> None:
backfill_concurrent_tasks=self.backfill_concurrent_tasks,
ddl_concurrent_tasks=self.ddl_concurrent_tasks,
users=self.users,
is_dev=self._is_dev_plan(plan),
is_dev=plan.is_dev,
)

if self.blocking:
Expand Down
Loading