From 1289f4b234080b00e81e0ffccde1d4e17a48c246 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Mon, 12 Dec 2022 16:03:06 -0800 Subject: [PATCH 1/5] Create example project durign repo initialization. Various improvements to repo validation --- sqlmesh/cli/__init__.py | 21 ++++++ sqlmesh/cli/example_project.py | 130 +++++++++++++++++++++++++++++++++ sqlmesh/cli/main.py | 35 ++++++--- sqlmesh/cli/options.py | 4 +- sqlmesh/core/audit.py | 48 ++++++++---- sqlmesh/core/context.py | 4 + sqlmesh/core/model.py | 92 ++++++++++++++++++----- sqlmesh/core/snapshot.py | 6 +- sqlmesh/core/test.py | 44 ++++++----- sqlmesh/utils/pydantic.py | 31 ++++++++ 10 files changed, 348 insertions(+), 67 deletions(-) create mode 100644 sqlmesh/cli/example_project.py diff --git a/sqlmesh/cli/__init__.py b/sqlmesh/cli/__init__.py index e69de29bb2..6b3094fb91 100644 --- a/sqlmesh/cli/__init__.py +++ b/sqlmesh/cli/__init__.py @@ -0,0 +1,21 @@ +import typing as t +from functools import wraps + +import click +from sqlglot.errors import SqlglotError + +from sqlmesh.utils.concurrency import NodeExecutionFailedError +from sqlmesh.utils.errors import SQLMeshError + + +def error_handler(func: t.Callable) -> t.Callable: + @wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except NodeExecutionFailedError as ex: + raise click.ClickException(str(ex.__cause__)) + except (SQLMeshError, SqlglotError) as ex: + raise click.ClickException(str(ex)) + + return wrapper diff --git a/sqlmesh/cli/example_project.py b/sqlmesh/cli/example_project.py new file mode 100644 index 0000000000..a19ccea4e2 --- /dev/null +++ b/sqlmesh/cli/example_project.py @@ -0,0 +1,130 @@ +import typing as t +from enum import Enum +from pathlib import Path + +import click + +DEFAULT_CONFIG = """import duckdb +from sqlmesh.core.config import Config + +config = Config( + engine_connection_factory=duckdb.connect, + engine_dialect="duckdb", +) + + +test_config = config +""" + + +DEFAULT_AIRFLOW_CONFIG = """import duckdb +from sqlmesh.core.config import AirflowSchedulerBackend, Config + +config = Config( + scheduler_backend=AirflowSchedulerBackend( + airflow_url="http://localhost:8080/", + username="airflow", + password="airflow", + ), + backfill_concurrent_tasks=4, + ddl_concurrent_tasks=4, +) + + +test_config = Config( + engine_connection_factory=duckdb.connect, + engine_dialect="duckdb", +) +""" + + +EXAMPLE_MODEL_NAME = "sqlmesh_example.example_model" + + +EXAMPLE_MODEL = f"""MODEL ( + name {EXAMPLE_MODEL_NAME}, + kind full, + cron '@daily' +); + +SELECT + 'dummy_id' AS id +""" + + +EXAMPLE_AUDIT = f"""AUDIT ( + name assert_dummy_id_exists, + model {EXAMPLE_MODEL_NAME} +); + +SELECT * +FROM {EXAMPLE_MODEL_NAME} +WHERE + id != 'dummy_id' +""" + + +EXAMPLE_TEST = f"""test_example_model: + model: {EXAMPLE_MODEL_NAME} + outputs: + query: + rows: + - id: 'dummy_id' +""" + + +class ProjectTemplate(Enum): + AIRFLOW = "airflow" + DEFAULT = "default" + + +def init_example_project( + path: t.Union[str, Path], template: ProjectTemplate = ProjectTemplate.DEFAULT +) -> None: + root_path = Path(path) + config_path = root_path / "config.py" + audits_path = root_path / "audits" + macros_path = root_path / "macros" + models_path = root_path / "models" + tests_path = root_path / "tests" + + if config_path.exists(): + raise click.ClickException(f"Found an existing config in '{config_path}'") + + _create_folders([audits_path, macros_path, models_path, tests_path]) + _create_config(config_path, template) + _create_audits(audits_path) + _create_models(models_path) + _create_tests(tests_path) + + +def _create_folders(target_folders: t.Sequence[Path]) -> None: + for folder_path in target_folders: + folder_path.mkdir() + (folder_path / ".gitkeep").touch() + + +def _create_config(config_path: Path, template: ProjectTemplate) -> None: + _write_file( + config_path, + DEFAULT_AIRFLOW_CONFIG + if template == ProjectTemplate.AIRFLOW + else DEFAULT_CONFIG, + ) + + +def _create_audits(audits_path: Path) -> None: + _write_file(audits_path / f"example_model.sql", EXAMPLE_AUDIT) + + +def _create_models(models_path: Path) -> None: + _write_file(models_path / "example_model.sql", EXAMPLE_MODEL) + + +def _create_tests(tests_path: Path) -> None: + _write_file(tests_path / "test_example_model.yaml", EXAMPLE_TEST) + + +def _write_file(path: Path, payload: str) -> None: + with open(path, "w", encoding="utf-8") as fd: + fd.write(payload) diff --git a/sqlmesh/cli/main.py b/sqlmesh/cli/main.py index 990215592e..defd0c6a1f 100644 --- a/sqlmesh/cli/main.py +++ b/sqlmesh/cli/main.py @@ -3,7 +3,9 @@ import click +from sqlmesh.cli import error_handler from sqlmesh.cli import options as opt +from sqlmesh.cli.example_project import ProjectTemplate, init_example_project from sqlmesh.core.context import Context from sqlmesh.core.test import run_all_model_tests, run_model_tests from sqlmesh.utils.date import TimeLike @@ -13,6 +15,7 @@ @opt.path @opt.config @click.pass_context +@error_handler def cli(ctx, path, config=None) -> None: """SQLMesh command line tool.""" path = os.path.abspath(path) @@ -33,18 +36,21 @@ def cli(ctx, path, config=None) -> None: @cli.command("init") +@click.option( + "-t", + "--template", + type=str, + help="Project template. Support values: airflow, default.", +) @click.pass_context -def init(ctx) -> None: +@error_handler +def init(ctx, template: t.Optional[str] = None) -> None: """Create a new SQLMesh repository.""" - path = os.path.join(ctx.obj, "config.py") - if os.path.exists(path): - raise click.ClickException(f"Found an existing config in `{path}`.") - with open(path, "w", encoding="utf8") as file: - file.write( - """from sqlmesh.core.config import Config - -config = Config()""" - ) + try: + project_template = ProjectTemplate(template.lower() if template else "default") + except ValueError: + raise click.ClickException(f"Invalid project template '{template}'") + init_example_project(ctx.obj, template=project_template) @cli.command("render") @@ -54,6 +60,7 @@ def init(ctx) -> None: @opt.latest_time @opt.expand @click.pass_context +@error_handler def render( ctx, model: str, @@ -91,6 +98,7 @@ def render( help="The number of rows which the query should be limited to.", ) @click.pass_context +@error_handler def evaluate( ctx, model: str, @@ -112,6 +120,7 @@ def evaluate( @cli.command("format") @click.pass_context +@error_handler def format(ctx) -> None: """Format all models in a given directory.""" ctx.obj.format() @@ -120,6 +129,7 @@ def format(ctx) -> None: @cli.command("diff") @opt.environment @click.pass_context +@error_handler def diff(ctx, environment: t.Optional[str] = None) -> None: """Show the diff between the current context and a given environment.""" ctx.obj.diff(environment) @@ -149,9 +159,11 @@ def diff(ctx, environment: t.Optional[str] = None) -> None: ) @click.option( "--no_gaps", + is_flag=True, help="Ensure that new snapshots have no data gaps when comparing to existing snapshots for matching models in the target environment.", ) @click.pass_context +@error_handler def plan(ctx, environment: t.Optional[str] = None, **kwargs) -> None: """Plan a migration of the current context's models with the given environment.""" context = ctx.obj @@ -161,6 +173,7 @@ def plan(ctx, environment: t.Optional[str] = None, **kwargs) -> None: @cli.command("dag") @opt.file @click.pass_context +@error_handler def dag(ctx, file) -> None: """ Renders the dag using graphviz. @@ -175,6 +188,7 @@ def dag(ctx, file) -> None: @opt.verbose @click.argument("tests", nargs=-1) @click.pass_obj +@error_handler def test(obj, k, verbose, tests) -> None: """Run model unit tests.""" # Set Python unittest verbosity level @@ -210,6 +224,7 @@ def test(obj, k, verbose, tests) -> None: @opt.end_time @opt.latest_time @click.pass_obj +@error_handler def audit( obj, models: t.Tuple[str], diff --git a/sqlmesh/cli/options.py b/sqlmesh/cli/options.py index 5a30a5eadc..bf48fc37ac 100644 --- a/sqlmesh/cli/options.py +++ b/sqlmesh/cli/options.py @@ -30,7 +30,7 @@ latest_time = click.option( "-l", "--latest", - help="The latest time used for non incremental datasets (defaults to yesterday).", + help="The latest time used for non incremental datasets (defaults to now).", ) expand = click.option( @@ -41,7 +41,7 @@ environment = click.option( "--environment", - help="The environment to diff the current context against.", + help="The environment to diff the current context against. Default: prod", ) file = click.option( diff --git a/sqlmesh/core/audit.py b/sqlmesh/core/audit.py index 0e792e151d..9344df167b 100644 --- a/sqlmesh/core/audit.py +++ b/sqlmesh/core/audit.py @@ -169,6 +169,23 @@ def load( ) raise + provided_meta_fields = {p.name for p in meta.expressions} + + missing_required_fields = AuditMeta.missing_required_fields( + provided_meta_fields + ) + if missing_required_fields: + _raise_config_error( + f"Missing required fields {missing_required_fields} in the audit definition", + path, + ) + + extra_fields = AuditMeta.extra_fields(provided_meta_fields) + if extra_fields: + _raise_config_error( + f"Invalid extra fields {extra_fields} in the audit definition", path + ) + if not isinstance(query, exp.Subqueryable): _raise_config_error("Missing SELECT query in the audit definition", path) raise @@ -176,20 +193,23 @@ def load( if not query.expressions: _raise_config_error("Query missing select statements", path) - audit = cls( - query=query, - expressions=statements, - **{ - "dialect": dialect or "", - **AuditMeta( - **{ - prop.name: prop.args.get("value") - for prop in meta.expressions - if prop - }, - ).dict(), - }, - ) + try: + audit = cls( + query=query, + expressions=statements, + **{ + "dialect": dialect or "", + **AuditMeta( + **{ + prop.name: prop.args.get("value") + for prop in meta.expressions + if prop + }, + ).dict(), + }, + ) + except Exception as ex: + _raise_config_error(str(ex), path) audit._path = path return audit diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index ac9f9a1267..969cee49eb 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -834,6 +834,10 @@ def _load_audits(self) -> None: dialect=self.dialect, ): if not audit.skip: + if audit.model not in self.models: + raise ConfigError( + f"Model '{audit.model}' referenced in the audit '{audit.name}' ({path}) was not found" + ) self.models[audit.model].audits[audit.name] = audit def _import_python_file(self, path: Path) -> t.Optional[types.ModuleType]: diff --git a/sqlmesh/core/model.py b/sqlmesh/core/model.py index 360ef9cf81..284c40aad8 100644 --- a/sqlmesh/core/model.py +++ b/sqlmesh/core/model.py @@ -421,14 +421,28 @@ def _enum_validator(cls, v: t.Any) -> ModelKind: return v name = v.name if isinstance(v, exp.Expression) else str(v) - return ModelKind.__members__[name.upper()] + try: + return ModelKind(name.lower()) + except ValueError: + _raise_config_error(f"Invalid model kind '{name}'") + raise - @validator("dialect", "owner", "cron", "storage_format", "description", pre=True) + @validator("dialect", "owner", "storage_format", "description", pre=True) def _string_validator(cls, v: t.Any) -> t.Optional[str]: if isinstance(v, exp.Expression): return v.name return str(v) if v is not None else None + @validator("cron", pre=True) + def _cron_validator(cls, v: t.Any) -> t.Optional[str]: + cron = cls._string_validator(v) + if cron: + try: + croniter(cron) + except Exception: + raise ConfigError(f"Invalid cron expression '{cron}'") + return cron + @validator("columns_", pre=True) def _columns_validator(cls, v: t.Any) -> t.Optional[t.Dict[str, exp.DataType]]: if isinstance(v, exp.Schema): @@ -461,9 +475,15 @@ def _date_validator(cls, v: t.Any) -> t.Optional[TimeLike]: @validator("batch_size", pre=True) def _int_validator(cls, v: t.Any) -> t.Optional[int]: - if isinstance(v, exp.Expression): - return int(v.name) - return int(v) if v is not None else None + if not isinstance(v, exp.Expression): + batch_size = int(v) if v is not None else None + else: + batch_size = int(v.name) + if batch_size is not None and batch_size <= 0: + raise ConfigError( + f"Invalid batch size {batch_size}. The value should be greater than 0" + ) + return batch_size @root_validator def _kind_validator(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: @@ -682,26 +702,55 @@ def load( path, ) + provided_meta_fields = {p.name for p in meta.expressions} + + missing_required_fields = ModelMeta.missing_required_fields( + provided_meta_fields + ) + if missing_required_fields: + _raise_config_error( + f"Missing required fields {missing_required_fields} in the model definition", + path, + ) + + extra_fields = ModelMeta.extra_fields(provided_meta_fields) + if extra_fields: + _raise_config_error( + f"Invalid extra fields {extra_fields} in the model definition", path + ) + if not isinstance(query, (exp.Subqueryable, d.MacroVar, d.Jinja)): _raise_config_error( "A query is required and must be a SELECT or UNION statement.", path, ) - model = cls( - query=query, - expressions=statements, - python_env=_python_env(query, module, macros or macro.get_registry()), - **{ - "dialect": dialect or "", - **ModelMeta( - **{prop.name: prop.args.get("value") for prop in meta.expressions}, - description="\n".join(comment.strip() for comment in meta.comments) - if meta.comments - else None, - ).dict(exclude_defaults=True), - }, - ) + if not query.expressions: + _raise_config_error("Query missing select statements", path) + raise + + try: + model = cls( + query=query, + expressions=statements, + python_env=_python_env(query, module, macros or macro.get_registry()), + **{ + "dialect": dialect or "", + **ModelMeta( + **{ + prop.name: prop.args.get("value") + for prop in meta.expressions + }, + description="\n".join( + comment.strip() for comment in meta.comments + ) + if meta.comments + else None, + ).dict(exclude_defaults=True), + }, + ) + except Exception as ex: + _raise_config_error(str(ex), location=path) model._path = path model.set_time_format(time_column_format) @@ -1233,6 +1282,9 @@ def validate_definition(self) -> None: self._path, ) + def _validate_view(self, query: exp.Expression) -> None: + pass + def _filter_time_column( self, query: exp.Select, start: TimeLike, end: TimeLike ) -> None: @@ -1391,5 +1443,5 @@ def _python_env( def _raise_config_error(msg: str, location: t.Optional[str | Path] = None) -> None: if location: - raise ConfigError(f"{msg}: '{location}'") + raise ConfigError(f"{msg} at '{location}'") raise ConfigError(msg) diff --git a/sqlmesh/core/snapshot.py b/sqlmesh/core/snapshot.py index f235ffaa0c..48c7ff3c76 100644 --- a/sqlmesh/core/snapshot.py +++ b/sqlmesh/core/snapshot.py @@ -124,9 +124,9 @@ def for_environment(self, environment: str) -> str: if p is not None ) - def schema_for_environment(self, environment: str) -> t.Optional[str]: - schema = self.schema_name - if schema is not None and environment.lower() != c.PROD: + def schema_for_environment(self, environment: str) -> str: + schema = self.schema_name or "default" + if environment.lower() != c.PROD: schema = f"{schema}__{environment}" return schema diff --git a/sqlmesh/core/test.py b/sqlmesh/core/test.py index d65f3fd978..6a435c5352 100644 --- a/sqlmesh/core/test.py +++ b/sqlmesh/core/test.py @@ -132,6 +132,7 @@ from sqlmesh.core.engine_adapter import EngineAdapter from sqlmesh.core.snapshot import Snapshot, table_name from sqlmesh.utils import unique +from sqlmesh.utils.errors import SQLMeshError from sqlmesh.utils.pydantic import PydanticModel from sqlmesh.utils.yaml import yaml @@ -149,7 +150,7 @@ def __hash__(self) -> int: return self.fully_qualified_test_name.__hash__() -class TestError(Exception): +class TestError(SQLMeshError): """Test error""" @@ -173,18 +174,22 @@ def __init__( engine_adapter: The engine adapter to use. path: An optional path to the test definition yaml file """ - if "inputs" not in body: - self._raise_error("Incomplete test, missing inputs") - if "outputs" not in body: - self._raise_error("Incomplete test, missing outputs") - self.body = body self.path = path self.test_name = test_name - self.model_name = body["model"] self.engine_adapter = engine_adapter + if "model" not in body: + self._raise_error("Incomplete test, missing model name") + + if "outputs" not in body: + self._raise_error("Incomplete test, missing outputs") + + self.model_name = body["model"] + if self.model_name not in snapshots: + self._raise_error(f"Model '{self.model_name}' was not found") + self.snapshot = snapshots[self.model_name] # For tests we just use the model name for the table reference and we don't want to expand mapping = {name: name for name in snapshots} @@ -218,18 +223,21 @@ def setUp(self) -> None: ) for snapshot_id in self.snapshot.parents: - if snapshot_id.name in table: - self.view_names.append( - table_name( - self.snapshot.physical_schema, - snapshot_id.name, - snapshot_id.fingerprint, - ) + if snapshot_id.name not in inputs: + self._raise_error( + f"Incomplete test, missing input for table {snapshot_id.name}" ) - self.engine_adapter.create_view( - self.view_names[-1], - parse_one(f"SELECT * FROM {snapshot_id.name}"), # type: ignore + self.view_names.append( + table_name( + self.snapshot.physical_schema, + snapshot_id.name, + snapshot_id.fingerprint, ) + ) + self.engine_adapter.create_view( + self.view_names[-1], + parse_one(f"SELECT * FROM {snapshot_id.name}"), # type: ignore + ) def tearDown(self) -> None: """Drop all input tables""" @@ -286,7 +294,7 @@ def __str__(self): return f"{self.test_name} ({self.path}:{self.body.lc.line})" def _raise_error(self, msg: str) -> None: - raise TestError(f"{msg}: {self.path}") + raise TestError(f"{msg} at {self.path}") def load_model_test_file( diff --git a/sqlmesh/utils/pydantic.py b/sqlmesh/utils/pydantic.py index 7175c6d3bd..e9bbd11fdc 100644 --- a/sqlmesh/utils/pydantic.py +++ b/sqlmesh/utils/pydantic.py @@ -25,3 +25,34 @@ def json( **kwargs, ) -> str: return super().json(**{**DEFAULT_ARGS, **kwargs}) # type: ignore + + @classmethod + def missing_required_fields( + cls: t.Type["PydanticModel"], provided_fields: t.Set[str] + ) -> t.Set[str]: + return cls.required_fields() - provided_fields + + @classmethod + def extra_fields( + cls: t.Type["PydanticModel"], provided_fields: t.Set[str] + ) -> t.Set[str]: + return provided_fields - cls.all_fields() + + @classmethod + def all_fields(cls: t.Type["PydanticModel"]) -> t.Set[str]: + return cls._fields() + + @classmethod + def required_fields(cls: t.Type["PydanticModel"]) -> t.Set[str]: + return cls._fields(lambda field: field.required) + + @classmethod + def _fields( + cls: t.Type["PydanticModel"], + predicate: t.Callable[[t.Any], bool] = lambda _: True, + ) -> t.Set[str]: + return { + field.alias if field.alias else field.name + for field in cls.__fields__.values() + if predicate(field) + } From a85cb13479cc748644d1d10df4fa0f4c60283f5b Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Mon, 12 Dec 2022 19:47:43 -0800 Subject: [PATCH 2/5] Make sure that the start date / time doesn't exceed the end date / time --- sqlmesh/cli/__init__.py | 2 +- sqlmesh/core/model.py | 3 +-- sqlmesh/core/plan.py | 3 ++- sqlmesh/core/scheduler.py | 11 ++++++++++- sqlmesh/utils/date.py | 10 ++++++++++ 5 files changed, 24 insertions(+), 5 deletions(-) diff --git a/sqlmesh/cli/__init__.py b/sqlmesh/cli/__init__.py index 6b3094fb91..453c918a61 100644 --- a/sqlmesh/cli/__init__.py +++ b/sqlmesh/cli/__init__.py @@ -15,7 +15,7 @@ def wrapper(*args, **kwargs): return func(*args, **kwargs) except NodeExecutionFailedError as ex: raise click.ClickException(str(ex.__cause__)) - except (SQLMeshError, SqlglotError) as ex: + except (SQLMeshError, SqlglotError, ValueError) as ex: raise click.ClickException(str(ex)) return wrapper diff --git a/sqlmesh/core/model.py b/sqlmesh/core/model.py index 284c40aad8..e1fdb668f3 100644 --- a/sqlmesh/core/model.py +++ b/sqlmesh/core/model.py @@ -470,7 +470,7 @@ def _date_validator(cls, v: t.Any) -> t.Optional[TimeLike]: if isinstance(v, exp.Expression): v = v.name if not to_datetime(v): - raise ConfigError(f"{v} not a valid date time") + raise ConfigError(f"'{v}' not a valid date time") return v @validator("batch_size", pre=True) @@ -727,7 +727,6 @@ def load( if not query.expressions: _raise_config_error("Query missing select statements", path) - raise try: model = cls( diff --git a/sqlmesh/core/plan.py b/sqlmesh/core/plan.py index 92ba965a19..6b8d0eb861 100644 --- a/sqlmesh/core/plan.py +++ b/sqlmesh/core/plan.py @@ -45,7 +45,7 @@ ) from sqlmesh.core.state_sync import StateReader from sqlmesh.utils import random_id -from sqlmesh.utils.date import TimeLike, make_inclusive, now, to_ds +from sqlmesh.utils.date import TimeLike, make_inclusive, now, to_ds, validate_date_range from sqlmesh.utils.errors import SQLMeshError from sqlmesh.utils.pydantic import PydanticModel @@ -206,6 +206,7 @@ def apply(self) -> None: """Runs apply if an apply function was passed in.""" if not self._apply: raise SQLMeshError(f"Plan was not initialized with an applier.") + validate_date_range(self.start, self.end) self._apply(self) def set_choice(self, snapshot: Snapshot, choice: SnapshotChangeCategory) -> None: diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index 32acc8b3b3..a308be002a 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -11,7 +11,13 @@ from sqlmesh.core.snapshot_evaluator import SnapshotEvaluator from sqlmesh.core.state_sync import StateSync from sqlmesh.utils.concurrency import NodeExecutionFailedError, concurrent_apply_to_dag -from sqlmesh.utils.date import TimeLike, now, to_datetime, yesterday +from sqlmesh.utils.date import ( + TimeLike, + now, + to_datetime, + validate_date_range, + yesterday, +) logger = logging.getLogger(__name__) SnapshotBatches = t.List[t.Tuple[Snapshot, t.List[t.Tuple[datetime, datetime]]]] @@ -66,6 +72,7 @@ def evaluate( latest: The latest datetime to use for non-incremental queries. kwargs: Additional kwargs to pass to the renderer. """ + validate_date_range(start, end) mapping = { **{ @@ -107,6 +114,8 @@ def run( end: The end of the run. Defaults to now. latest: The latest datetime to use for non-incremental queries. """ + validate_date_range(start, end) + latest = latest or now() batches = self.interval_params(self.snapshots.values(), start, end, latest) diff --git a/sqlmesh/utils/date.py b/sqlmesh/utils/date.py index 70cd64e46c..c22f57c2eb 100644 --- a/sqlmesh/utils/date.py +++ b/sqlmesh/utils/date.py @@ -242,3 +242,13 @@ def preserve_time_like_kind(input_value: TimeLike, output_value: TimeLike) -> Ti if is_date(input_value): return to_date(output_value) return output_value + + +def validate_date_range( + start: t.Optional[TimeLike], + end: t.Optional[TimeLike], +) -> None: + if start and end and to_datetime(start) > to_datetime(end): + raise ValueError( + f"Start date / time ({start}) can't be greater than end date / time ({end})" + ) From 1d2029bbb2ee93165e62957a33769edeefa30e90 Mon Sep 17 00:00:00 2001 From: eakmanrq Date: Tue, 13 Dec 2022 11:48:44 -0800 Subject: [PATCH 3/5] add incremental to full with dependency --- sqlmesh/cli/example_project.py | 77 ++++++++++++++++++++++++++++------ 1 file changed, 64 insertions(+), 13 deletions(-) diff --git a/sqlmesh/cli/example_project.py b/sqlmesh/cli/example_project.py index a19ccea4e2..aea9ea5189 100644 --- a/sqlmesh/cli/example_project.py +++ b/sqlmesh/cli/example_project.py @@ -37,39 +37,86 @@ ) """ +EXAMPLE_SCHEMA_NAME = "sqlmesh_example" +EXAMPLE_FULL_MODEL_NAME = f"{EXAMPLE_SCHEMA_NAME}.example_full_model" +EXAMPLE_INCREMENTAL_MODEL_NAME = f"{EXAMPLE_SCHEMA_NAME}.example_incremental_model" -EXAMPLE_MODEL_NAME = "sqlmesh_example.example_model" - -EXAMPLE_MODEL = f"""MODEL ( - name {EXAMPLE_MODEL_NAME}, +EXAMPLE_FULL_MODEL_DEF = f"""MODEL ( + name {EXAMPLE_FULL_MODEL_NAME}, kind full, - cron '@daily' + cron '@daily', ); SELECT - 'dummy_id' AS id + item_id, + count(distinct id) AS num_orders, +FROM + {EXAMPLE_INCREMENTAL_MODEL_NAME} +GROUP BY item_id """ +EXAMPLE_INCREMENTAL_MODEL_DEF = f"""MODEL ( + name {EXAMPLE_INCREMENTAL_MODEL_NAME}, + kind incremental, + time_column ds, + start '2020-01-01', + batch_size 1, + cron '@daily', +); + +SELECT + id, + item_id, + ds, +FROM + (VALUES + (1, 1, '2020-01-01'), + (1, 2, '2020-01-01'), + (2, 1, '2020-01-01'), + (3, 3, '2020-01-03'), + (4, 1, '2020-01-04'), + (5, 1, '2020-01-05'), + (6, 1, '2020-01-06'), + (7, 1, '2020-01-07') + ) AS t (id, item_id, ds) +WHERE + ds between @start_ds and @end_ds +""" EXAMPLE_AUDIT = f"""AUDIT ( - name assert_dummy_id_exists, - model {EXAMPLE_MODEL_NAME} + name asset_positive_order_ids, + model {EXAMPLE_FULL_MODEL_NAME} ); SELECT * -FROM {EXAMPLE_MODEL_NAME} +FROM {EXAMPLE_FULL_MODEL_NAME} WHERE - id != 'dummy_id' + item_id < 0 """ EXAMPLE_TEST = f"""test_example_model: - model: {EXAMPLE_MODEL_NAME} + model: {EXAMPLE_FULL_MODEL_NAME} + inputs: + {EXAMPLE_INCREMENTAL_MODEL_NAME}: + rows: + - id: 1 + item_id: 1 + ds: '2020-01-01' + - id: 2 + item_id: 1 + ds: '2020-01-02' + - id: 3 + item_id: 2 + ds: '2020-01-03' outputs: query: rows: - - id: 'dummy_id' + - item_id: 1 + num_orders: 2 + - item_id: 2 + num_orders: 1 """ @@ -118,7 +165,11 @@ def _create_audits(audits_path: Path) -> None: def _create_models(models_path: Path) -> None: - _write_file(models_path / "example_model.sql", EXAMPLE_MODEL) + for model_name, model_def in [ + (EXAMPLE_FULL_MODEL_NAME, EXAMPLE_FULL_MODEL_DEF), + (EXAMPLE_INCREMENTAL_MODEL_NAME, EXAMPLE_INCREMENTAL_MODEL_DEF), + ]: + _write_file(models_path / f"{model_name.split('.')[-1]}.sql", model_def) def _create_tests(tests_path: Path) -> None: From 7f339431ed8948620156a1b28558ee4f87fa4865 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Tue, 13 Dec 2022 12:28:15 -0800 Subject: [PATCH 4/5] In scheduler start snapshot progress in the console --- sqlmesh/core/scheduler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index a308be002a..f99a6f1f66 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -133,6 +133,7 @@ def run( sid = snapshot.snapshot_id for interval in intervals: dag.add((sid, interval), upstream_dependencies) + self.console.start_snapshot_progress(snapshot.name, len(intervals)) def evaluate_node(node: SchedulingUnit) -> None: assert latest From 627f0fc393a94afe2b3ad87143361eff2c024ace Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Tue, 13 Dec 2022 12:35:23 -0800 Subject: [PATCH 5/5] Rename files in the generate example project --- sqlmesh/cli/example_project.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sqlmesh/cli/example_project.py b/sqlmesh/cli/example_project.py index aea9ea5189..6d4f4c7811 100644 --- a/sqlmesh/cli/example_project.py +++ b/sqlmesh/cli/example_project.py @@ -70,7 +70,7 @@ item_id, ds, FROM - (VALUES + (VALUES (1, 1, '2020-01-01'), (1, 2, '2020-01-01'), (2, 1, '2020-01-01'), @@ -96,7 +96,7 @@ """ -EXAMPLE_TEST = f"""test_example_model: +EXAMPLE_TEST = f"""test_example_full_model: model: {EXAMPLE_FULL_MODEL_NAME} inputs: {EXAMPLE_INCREMENTAL_MODEL_NAME}: @@ -161,7 +161,7 @@ def _create_config(config_path: Path, template: ProjectTemplate) -> None: def _create_audits(audits_path: Path) -> None: - _write_file(audits_path / f"example_model.sql", EXAMPLE_AUDIT) + _write_file(audits_path / "example_full_model.sql", EXAMPLE_AUDIT) def _create_models(models_path: Path) -> None: @@ -173,7 +173,7 @@ def _create_models(models_path: Path) -> None: def _create_tests(tests_path: Path) -> None: - _write_file(tests_path / "test_example_model.yaml", EXAMPLE_TEST) + _write_file(tests_path / "test_example_full_model.yaml", EXAMPLE_TEST) def _write_file(path: Path, payload: str) -> None: