Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
pr review
  • Loading branch information
tobymao committed Dec 9, 2022
commit 7fbf4308100b1984a86f4f6d504e8159002aff71
8 changes: 4 additions & 4 deletions example/models/order_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ def execute(
) -> pd.DataFrame:
dfs = []

raw_orders = context.table("sushi.orders")
orders_table = context.table("sushi.orders")
items_table = context.table("sushi.items")

for dt in iter_dates(start, end):
# this section not super clean, make it easier to fetch other snapshots
orders = context.fetchdf(
f"""
SELECT *
FROM {raw_orders}
FROM {orders_table}
WHERE ds = '{to_ds(dt)}'
"""
)
Expand All @@ -54,7 +54,7 @@ def execute(
items = context.fetchdf(
f"""
SELECT *
FROM {raw_orders}
FROM {items_table}
WHERE ds = '{to_ds(dt)}'
"""
)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"requests",
"rich",
"ruamel.yaml",
"sqlglot>=10.2.4",
"sqlglot>=10.2.5",
],
extras_require={
"dev": [
Expand Down
65 changes: 48 additions & 17 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"""
from __future__ import annotations

import abc
import contextlib
import importlib
import types
Expand Down Expand Up @@ -74,21 +75,23 @@
extend_sqlglot()


class ExecutionContext:
"""The minimal context needed in order to execute a query.
Args:
engine_adapter: The engine adapter to execute queries against.
mapping: A mapping of models to physical tables.
"""
class BaseContext(abc.ABC):
"""The base context which defines methods to execute a model."""

def __init__(self, engine_adapter: EngineAdapter, mapping: t.Dict[str, str]):
self.engine_adapter = engine_adapter
self.spark = self.engine_adapter.spark
self._mapping = mapping
@property
@abc.abstractmethod
def model_tables(self) -> t.Dict[str, str]:
"""Returns a mapping of model names to tables."""

@property
def mapping(self) -> t.Dict[str, str]:
return self._mapping
@abc.abstractmethod
def engine_adapter(self) -> EngineAdapter:
"""Returns an engine adapter."""

@property
def spark(self) -> t.Optional["pyspark.sql.SparkSession"]: # type: ignore
"""Returns the spark session if it exists."""
return self.engine_adapter.spark

def table(self, model_name: str) -> str:
"""Gets the physical table name for a given model.
Expand All @@ -99,7 +102,7 @@ def table(self, model_name: str) -> str:
Returns:
The physical table name.
"""
return self.mapping[model_name]
return self.model_tables[model_name]

def fetchdf(self, query: t.Union[exp.Expression, str]) -> DF:
"""Fetches a dataframe given a sql string or sqlglot expression.
Expand All @@ -113,7 +116,30 @@ def fetchdf(self, query: t.Union[exp.Expression, str]) -> DF:
return self.engine_adapter.fetchdf(query)


class Context(ExecutionContext):
class ExecutionContext(BaseContext):
"""The minimal context needed to execute a model.

Args:
engine_adapter: The engine adapter to execute queries against.
mapping: A mapping of models to physical tables.
"""

def __init__(self, engine_adapter: EngineAdapter, model_tables: t.Dict[str, str]):
self._engine_adapter = engine_adapter
self._model_tables = model_tables

@property
def engine_adapter(self) -> EngineAdapter:
"""Returns an engine adapter."""
return self._engine_adapter

@property
def model_tables(self) -> t.Dict[str, str]:
"""Returns a mapping of model names to tables."""
return self._model_tables


class Context(BaseContext):
"""Encapsulates a SQLMesh environment supplying convenient functions to perform various tasks.

Args:
Expand Down Expand Up @@ -178,7 +204,7 @@ def __init__(
ddl_concurrent_tasks or self.config.ddl_concurrent_tasks
)

self.engine_adapter = engine_adapter or EngineAdapter(
self._engine_adapter = engine_adapter or EngineAdapter(
self.config.engine_connection_factory,
self.config.engine_dialect,
multithreaded=self.ddl_concurrent_tasks > 1,
Expand Down Expand Up @@ -211,6 +237,11 @@ def __init__(
if load:
self.load()

@property
def engine_adapter(self) -> EngineAdapter:
"""Returns an engine adapter."""
return self._engine_adapter

def upsert_model(self, model: t.Union[str, Model] = "", **kwargs) -> Model:
"""Update or insert a model.

Expand Down Expand Up @@ -345,7 +376,7 @@ def snapshots(self) -> t.Dict[str, Snapshot]:
return snapshots

@property
def mapping(self) -> t.Dict[str, str]:
def model_tables(self) -> t.Dict[str, str]:
"""Mapping of model name to physical table name.

If a snapshot has not been versioned yet, its view name will be returned.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why return view? So that local evaluation works?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, in case you haven't pushed a snapshot yet (because you can run evaluate before plan)

Expand Down Expand Up @@ -439,7 +470,7 @@ def evaluate(
start,
end,
latest,
mapping=self.mapping,
mapping=self.model_tables,
limit=limit,
)

Expand Down