Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example/models/top_waiters.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* View of top waiters. */
MODEL (
name sushi.top_waiters,
kind full,
kind view,
owner jen
);

Expand Down
4 changes: 4 additions & 0 deletions sqlmesh/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ def create_plan_evaluator(self, context: Context) -> PlanEvaluator:
dag_creation_max_retry_attempts=self.dag_creation_max_retry_attempts,
console=context.console,
notification_targets=context.notification_targets,
ddl_concurrent_tasks=context.ddl_concurrent_tasks,
)


Expand Down Expand Up @@ -286,6 +287,8 @@ class Config(PydanticModel):
physical_schema: The default schema used to store materialized tables.
snapshot_ttl: Duration before unpromoted snapshots are removed.
time_column_format: The default format to use for all model time columns. Defaults to %Y-%m-%d.
ddl_concurrent_task: The number of concurrent tasks used for DDL
operations (table / view creation, deletion, etc). Default: 1.
"""

engine_adapter: EngineAdapter = Field(
Expand All @@ -298,6 +301,7 @@ class Config(PydanticModel):
snapshot_ttl: str = ""
ignore_patterns: t.List[str] = []
time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT
ddl_concurrent_tasks: int = 1

class Config:
arbitrary_types_allowed = True
13 changes: 10 additions & 3 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class Context:
physical_schema: The schema used to store physical materialized tables.
snapshot_ttl: Duration before unpromoted snapshots are removed.
path: The directory containing SQLMesh files.
ddl_concurrent_task: The number of concurrent tasks used for DDL
operations (table / view creation, deletion, etc). Default: 1.
config: A Config object or the name of a Config object in config.py.
test_config: A Config object or name of a Config object in config.py to use for testing only
load: Whether or not to automatically load all models and macros (default True).
Expand All @@ -97,6 +99,7 @@ def __init__(
physical_schema: str = "",
snapshot_ttl: str = "",
path: str = "",
ddl_concurrent_tasks: t.Optional[int] = None,
config: t.Optional[t.Union[Config, str]] = None,
test_config: t.Optional[t.Union[Config, str]] = None,
load: bool = True,
Expand All @@ -120,7 +123,12 @@ def __init__(
self.macros = UniqueKeyDict("macros")
self.dag: DAG[str] = DAG()
self.engine_adapter = engine_adapter or self.config.engine_adapter
self.snapshot_evaluator = SnapshotEvaluator(self.engine_adapter)
self.ddl_concurrent_tasks = (
ddl_concurrent_tasks or self.config.ddl_concurrent_tasks
)
self.snapshot_evaluator = SnapshotEvaluator(
self.engine_adapter, ddl_concurrent_tasks=self.ddl_concurrent_tasks
)
self._ignore_patterns = c.IGNORE_PATTERNS + self.config.ignore_patterns
self.console = console or get_console()

Expand Down Expand Up @@ -705,5 +713,4 @@ def _glob_path(
def _add_model_to_dag(self, model: Model) -> None:
self.dag.graph[model.name] = set()

for table in model.depends_on:
self.dag.add(model.name, table)
self.dag.add(model.name, model.depends_on)
14 changes: 7 additions & 7 deletions sqlmesh/core/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ class DAG(t.Generic[T]):
def __init__(self, graph: t.Optional[t.Dict[T, t.Set[T]]] = None):
self.graph = graph or {}

def add(self, node: T, dependency: t.Optional[T] = None) -> None:
def add(self, node: T, dependencies: t.Optional[t.Iterable[T]] = None) -> None:
"""Add a node to the graph with an optional upstream dependency.

Args:
node: The node to add.
dependency: An optional dependency to add to the node.
dependencies: Optional dependencies to add to the node.
"""
if node not in self.graph:
self.graph[node] = set()
if dependency:
self.graph[node].add(dependency)
if dependencies:
self.graph[node].update(dependencies)

def subdag(self, *nodes: T) -> DAG[T]:
"""Create a new subdag given node(s).
Expand All @@ -50,7 +50,7 @@ def subdag(self, *nodes: T) -> DAG[T]:

def upstream(self, node: T) -> t.List[T]:
"""Returns all upstream dependencies in topologically sorted order."""
return self.subdag(node).sort()[:-1]
return self.subdag(node).sorted()[:-1]

@property
def leaves(self) -> t.Set[T]:
Expand All @@ -59,7 +59,7 @@ def leaves(self) -> t.Set[T]:
dep for deps in self.graph.values() for dep in deps if dep not in self.graph
}

def sort(self) -> t.List[T]:
def sorted(self) -> t.List[T]:
"""Topologically sort the graph.

Returns:
Expand Down Expand Up @@ -98,7 +98,7 @@ def downstream(self, node: T) -> t.List[T]:
Returns:
A list of descendant nodes sorted in topological order.
"""
sorted_nodes = self.sort()
sorted_nodes = self.sorted()
try:
node_index = sorted_nodes.index(node)
except ValueError:
Expand Down
6 changes: 3 additions & 3 deletions sqlmesh/core/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,9 +814,9 @@ def _render_query(
end: The end datetime to render. Defaults to epoch start.
latest: The latest datetime to use for non-incremental queries. Defaults to epoch start.
add_incremental_filter: Add an incremental filter to the query if the model is incremental.
snapshots: All snapshots to use for expansion and mapping of physical locations.
snapshots: All upstream snapshots to use for expansion and mapping of physical locations.
If passing snapshots is undesirable, mapping can be used instead to manually map tables.
mapping: Mapping to replace table names, if not set, the mapping wil be created from snapshots.
mapping: Mapping to replace table names, if not set, the mapping will be created from snapshots.
expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries
that depend on materialized tables. Model definitions are inlined and can thus be run end to
end on the fly.
Expand Down Expand Up @@ -913,7 +913,7 @@ def ctas_query(self, snapshots: t.Dict[str, Snapshot]) -> exp.Subqueryable:
SELECTS and hopefully the optimizer is smart enough to not do anything.

Args:
All upstream snapshots of this model so queries can be expanded.
snapshots: All upstream snapshots of this model so queries can be expanded.
Return:
The mocked out ctas query.
"""
Expand Down
2 changes: 1 addition & 1 deletion sqlmesh/core/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def snapshot_change_category(self, snapshot: Snapshot) -> SnapshotChangeCategory
def info_cache(self) -> InfoCache:
"""Returns the info cache of categorized, uncategorized snapshots."""
if self._info_cache is None:
queue = deque(self._dag.sort())
queue = deque(self._dag.sorted())
snapshots = []
all_indirectly_modified = set()

Expand Down
37 changes: 23 additions & 14 deletions sqlmesh/core/plan_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,18 @@ def _push(self, plan: Plan) -> None:
Args:
plan: The plan to source snapshots from.
"""
snapshots = {snapshot.name: snapshot for snapshot in plan.new_snapshots}
self.state_sync.push_snapshots(snapshots.values())
for snapshot in snapshots.values():
self.snapshot_evaluator.create(snapshot, snapshots)
parent_snapshot_ids = {
Comment thread
izeigerman marked this conversation as resolved.
p_sid for snapshot in plan.new_snapshots for p_sid in snapshot.parents
}

stored_snapshots_by_id = self.state_sync.get_snapshots(parent_snapshot_ids)
new_snapshots_by_id = {
snapshot.snapshot_id: snapshot for snapshot in plan.new_snapshots
}
all_snapshots_by_id = {**stored_snapshots_by_id, **new_snapshots_by_id}

self.snapshot_evaluator.create(plan.new_snapshots, all_snapshots_by_id)
self.state_sync.push_snapshots(plan.new_snapshots)

def _promote(self, plan: Plan) -> None:
"""Promote a plan.
Expand All @@ -110,16 +118,14 @@ def _promote(self, plan: Plan) -> None:

added, removed = self.state_sync.promote(environment, no_gaps=plan.no_gaps)

for snapshot_table_info in added:
self.snapshot_evaluator.promote(
snapshot_table_info,
environment=environment.name,
)
for snapshot_table_info in removed:
self.snapshot_evaluator.demote(
snapshot_table_info,
environment=environment.name,
)
self.snapshot_evaluator.promote(
added,
environment=environment.name,
)
self.snapshot_evaluator.demote(
removed,
environment=environment.name,
)


class AirflowPlanEvaluator(PlanEvaluator):
Expand All @@ -132,6 +138,7 @@ def __init__(
dag_creation_poll_interval_secs: int = 30,
dag_creation_max_retry_attempts: int = 10,
notification_targets: t.Optional[t.List[NotificationTarget]] = None,
ddl_concurrent_tasks: int = 1,
):
self.airflow_client = airflow_client
self.blocking = blocking
Expand All @@ -140,6 +147,7 @@ def __init__(
self.dag_creation_max_retry_attempts = dag_creation_max_retry_attempts
self.console = console or get_console()
self.notification_targets = notification_targets or []
self.ddl_concurrent_tasks = ddl_concurrent_tasks

def evaluate(self, plan: Plan) -> None:
environment = plan.environment
Expand All @@ -153,6 +161,7 @@ def evaluate(self, plan: Plan) -> None:
no_gaps=plan.no_gaps,
restatements=plan.restatements,
notification_targets=self.notification_targets,
ddl_concurrent_tasks=self.ddl_concurrent_tasks,
)

if self.blocking:
Expand Down
8 changes: 5 additions & 3 deletions sqlmesh/core/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class SnapshotTableInfo(PydanticModel, SnapshotInfoMixin, frozen=True):
fingerprint: str
version: str
physical_schema: str
parents: t.Tuple[SnapshotId, ...]
previous_versions: t.Tuple[SnapshotDataVersion, ...] = ()
change_category: t.Optional[SnapshotChangeCategory]

Expand Down Expand Up @@ -233,7 +234,7 @@ class Snapshot(PydanticModel, SnapshotInfoMixin):
fingerprint: str
physical_schema: str
model: Model
parents: t.List[SnapshotId]
parents: t.Tuple[SnapshotId, ...]
intervals: Intervals
created_ts: int
updated_ts: int
Expand Down Expand Up @@ -327,7 +328,7 @@ def from_model(
),
physical_schema=physical_schema,
model=model,
parents=[
parents=tuple(
SnapshotId(
name=name,
fingerprint=fingerprint_from_model(
Expand All @@ -338,7 +339,7 @@ def from_model(
),
)
for name in _parents_from_model(model, models)
],
),
intervals=[],
created_ts=created_ts,
updated_ts=created_ts,
Expand Down Expand Up @@ -509,6 +510,7 @@ def table_info(self) -> SnapshotTableInfo:
name=self.name,
fingerprint=self.fingerprint,
version=self.version,
parents=self.parents,
previous_versions=self.previous_versions,
change_category=self.change_category,
)
Expand Down
Loading