Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions sqlmesh/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import typing as t
from functools import wraps

import click
from sqlglot.errors import SqlglotError

from sqlmesh.utils.concurrency import NodeExecutionFailedError
from sqlmesh.utils.errors import SQLMeshError


def error_handler(func: t.Callable) -> t.Callable:
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except NodeExecutionFailedError as ex:
raise click.ClickException(str(ex.__cause__))
except (SQLMeshError, SqlglotError, ValueError) as ex:
raise click.ClickException(str(ex))

return wrapper
181 changes: 181 additions & 0 deletions sqlmesh/cli/example_project.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import typing as t
from enum import Enum
from pathlib import Path

import click

DEFAULT_CONFIG = """import duckdb
from sqlmesh.core.config import Config

config = Config(
engine_connection_factory=duckdb.connect,
engine_dialect="duckdb",
)


test_config = config
"""


DEFAULT_AIRFLOW_CONFIG = """import duckdb
from sqlmesh.core.config import AirflowSchedulerBackend, Config

config = Config(
scheduler_backend=AirflowSchedulerBackend(
airflow_url="http://localhost:8080/",
username="airflow",
password="airflow",
),
backfill_concurrent_tasks=4,
ddl_concurrent_tasks=4,
)


test_config = Config(
engine_connection_factory=duckdb.connect,
engine_dialect="duckdb",
)
"""

EXAMPLE_SCHEMA_NAME = "sqlmesh_example"
EXAMPLE_FULL_MODEL_NAME = f"{EXAMPLE_SCHEMA_NAME}.example_full_model"
EXAMPLE_INCREMENTAL_MODEL_NAME = f"{EXAMPLE_SCHEMA_NAME}.example_incremental_model"


EXAMPLE_FULL_MODEL_DEF = f"""MODEL (
name {EXAMPLE_FULL_MODEL_NAME},
kind full,
cron '@daily',
);

SELECT
item_id,
count(distinct id) AS num_orders,
FROM
{EXAMPLE_INCREMENTAL_MODEL_NAME}
GROUP BY item_id
"""

EXAMPLE_INCREMENTAL_MODEL_DEF = f"""MODEL (
name {EXAMPLE_INCREMENTAL_MODEL_NAME},
kind incremental,
time_column ds,
start '2020-01-01',
batch_size 1,
cron '@daily',
);

SELECT
id,
item_id,
ds,
FROM
(VALUES
(1, 1, '2020-01-01'),
(1, 2, '2020-01-01'),
(2, 1, '2020-01-01'),
(3, 3, '2020-01-03'),
(4, 1, '2020-01-04'),
(5, 1, '2020-01-05'),
(6, 1, '2020-01-06'),
(7, 1, '2020-01-07')
) AS t (id, item_id, ds)
WHERE
ds between @start_ds and @end_ds
"""

EXAMPLE_AUDIT = f"""AUDIT (
name asset_positive_order_ids,
model {EXAMPLE_FULL_MODEL_NAME}
);

SELECT *
FROM {EXAMPLE_FULL_MODEL_NAME}
WHERE
item_id < 0
"""


EXAMPLE_TEST = f"""test_example_full_model:
model: {EXAMPLE_FULL_MODEL_NAME}
inputs:
{EXAMPLE_INCREMENTAL_MODEL_NAME}:
rows:
- id: 1
item_id: 1
ds: '2020-01-01'
- id: 2
item_id: 1
ds: '2020-01-02'
- id: 3
item_id: 2
ds: '2020-01-03'
outputs:
query:
rows:
- item_id: 1
num_orders: 2
- item_id: 2
num_orders: 1
"""


class ProjectTemplate(Enum):
AIRFLOW = "airflow"
DEFAULT = "default"


def init_example_project(
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tobymao @eakmanrq can you please help make sure that this is a reasonable enough dummy project to initialize the repo with.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to expand this a bit to include two models (1 incremental and 1 full) and have a dependency between the incremental and full. The thinking is to provide two examples with a dependency.

path: t.Union[str, Path], template: ProjectTemplate = ProjectTemplate.DEFAULT
) -> None:
root_path = Path(path)
config_path = root_path / "config.py"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omg i didn't know you could do this

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I will start using this.

audits_path = root_path / "audits"
macros_path = root_path / "macros"
models_path = root_path / "models"
tests_path = root_path / "tests"

if config_path.exists():
raise click.ClickException(f"Found an existing config in '{config_path}'")

_create_folders([audits_path, macros_path, models_path, tests_path])
_create_config(config_path, template)
_create_audits(audits_path)
_create_models(models_path)
_create_tests(tests_path)


def _create_folders(target_folders: t.Sequence[Path]) -> None:
for folder_path in target_folders:
folder_path.mkdir()
(folder_path / ".gitkeep").touch()


def _create_config(config_path: Path, template: ProjectTemplate) -> None:
_write_file(
config_path,
DEFAULT_AIRFLOW_CONFIG
if template == ProjectTemplate.AIRFLOW
else DEFAULT_CONFIG,
)


def _create_audits(audits_path: Path) -> None:
_write_file(audits_path / "example_full_model.sql", EXAMPLE_AUDIT)


def _create_models(models_path: Path) -> None:
for model_name, model_def in [
(EXAMPLE_FULL_MODEL_NAME, EXAMPLE_FULL_MODEL_DEF),
(EXAMPLE_INCREMENTAL_MODEL_NAME, EXAMPLE_INCREMENTAL_MODEL_DEF),
]:
_write_file(models_path / f"{model_name.split('.')[-1]}.sql", model_def)


def _create_tests(tests_path: Path) -> None:
_write_file(tests_path / "test_example_full_model.yaml", EXAMPLE_TEST)


def _write_file(path: Path, payload: str) -> None:
with open(path, "w", encoding="utf-8") as fd:
fd.write(payload)
35 changes: 25 additions & 10 deletions sqlmesh/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

import click

from sqlmesh.cli import error_handler
from sqlmesh.cli import options as opt
from sqlmesh.cli.example_project import ProjectTemplate, init_example_project
from sqlmesh.core.context import Context
from sqlmesh.core.test import run_all_model_tests, run_model_tests
from sqlmesh.utils.date import TimeLike
Expand All @@ -13,6 +15,7 @@
@opt.path
@opt.config
@click.pass_context
@error_handler
def cli(ctx, path, config=None) -> None:
"""SQLMesh command line tool."""
path = os.path.abspath(path)
Expand All @@ -33,18 +36,21 @@ def cli(ctx, path, config=None) -> None:


@cli.command("init")
@click.option(
"-t",
"--template",
type=str,
help="Project template. Support values: airflow, default.",
)
@click.pass_context
def init(ctx) -> None:
@error_handler
def init(ctx, template: t.Optional[str] = None) -> None:
"""Create a new SQLMesh repository."""
path = os.path.join(ctx.obj, "config.py")
if os.path.exists(path):
raise click.ClickException(f"Found an existing config in `{path}`.")
with open(path, "w", encoding="utf8") as file:
file.write(
"""from sqlmesh.core.config import Config

config = Config()"""
)
try:
project_template = ProjectTemplate(template.lower() if template else "default")
except ValueError:
raise click.ClickException(f"Invalid project template '{template}'")
init_example_project(ctx.obj, template=project_template)


@cli.command("render")
Expand All @@ -54,6 +60,7 @@ def init(ctx) -> None:
@opt.latest_time
@opt.expand
@click.pass_context
@error_handler
def render(
ctx,
model: str,
Expand Down Expand Up @@ -91,6 +98,7 @@ def render(
help="The number of rows which the query should be limited to.",
)
@click.pass_context
@error_handler
def evaluate(
ctx,
model: str,
Expand All @@ -112,6 +120,7 @@ def evaluate(

@cli.command("format")
@click.pass_context
@error_handler
def format(ctx) -> None:
"""Format all models in a given directory."""
ctx.obj.format()
Expand All @@ -120,6 +129,7 @@ def format(ctx) -> None:
@cli.command("diff")
@opt.environment
@click.pass_context
@error_handler
def diff(ctx, environment: t.Optional[str] = None) -> None:
"""Show the diff between the current context and a given environment."""
ctx.obj.diff(environment)
Expand Down Expand Up @@ -149,9 +159,11 @@ def diff(ctx, environment: t.Optional[str] = None) -> None:
)
@click.option(
"--no_gaps",
is_flag=True,
help="Ensure that new snapshots have no data gaps when comparing to existing snapshots for matching models in the target environment.",
)
@click.pass_context
@error_handler
def plan(ctx, environment: t.Optional[str] = None, **kwargs) -> None:
"""Plan a migration of the current context's models with the given environment."""
context = ctx.obj
Expand All @@ -161,6 +173,7 @@ def plan(ctx, environment: t.Optional[str] = None, **kwargs) -> None:
@cli.command("dag")
@opt.file
@click.pass_context
@error_handler
def dag(ctx, file) -> None:
"""
Renders the dag using graphviz.
Expand All @@ -175,6 +188,7 @@ def dag(ctx, file) -> None:
@opt.verbose
@click.argument("tests", nargs=-1)
@click.pass_obj
@error_handler
def test(obj, k, verbose, tests) -> None:
"""Run model unit tests."""
# Set Python unittest verbosity level
Expand Down Expand Up @@ -210,6 +224,7 @@ def test(obj, k, verbose, tests) -> None:
@opt.end_time
@opt.latest_time
@click.pass_obj
@error_handler
def audit(
obj,
models: t.Tuple[str],
Expand Down
4 changes: 2 additions & 2 deletions sqlmesh/cli/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
latest_time = click.option(
"-l",
"--latest",
help="The latest time used for non incremental datasets (defaults to yesterday).",
help="The latest time used for non incremental datasets (defaults to now).",
)

expand = click.option(
Expand All @@ -41,7 +41,7 @@

environment = click.option(
"--environment",
help="The environment to diff the current context against.",
help="The environment to diff the current context against. Default: prod",
)

file = click.option(
Expand Down
Loading