diff --git a/docs/concepts/models/model_kinds.md b/docs/concepts/models/model_kinds.md index cd44f7e2fa..e777ad0eba 100644 --- a/docs/concepts/models/model_kinds.md +++ b/docs/concepts/models/model_kinds.md @@ -318,7 +318,7 @@ In addition to specifying a time column in the `MODEL` DDL, the model's query mu A model's `time_column` should be in the [UTC time zone](https://en.wikipedia.org/wiki/Coordinated_Universal_Time) to ensure correct interaction with SQLMesh's scheduler and predefined macro variables. - This requirement aligns with the data engineering best practice of converting datetime/timestamp columns to UTC as soon as they are ingested into the data system and only converting them to local timezones when they exit the system for downstream uses. + This requirement aligns with the data engineering best practice of converting datetime/timestamp columns to UTC as soon as they are ingested into the data system and only converting them to local timezones when they exit the system for downstream uses. The `cron_tz` flag **does not** change this requirement. Placing all timezone conversion code in the system's first/last transformation models prevents inadvertent timezone-related errors as data flows between models. @@ -1598,4 +1598,4 @@ Since it's unmanaged, it doesnt support the `batch_size` and `batch_concurrency` Similar to `INCREMENTAL_BY_PARTITION`, attempting to [restate](../plans.md#restatement-plans) an `INCREMENTAL_UNMANAGED` model will trigger a full restatement. That is, the model will be rebuilt from scratch rather than from a time slice you specify. - This is because an append-only table is inherently non-idempotent. Restating `INCREMENTAL_UNMANAGED` models may lead to data loss and should be performed with care. \ No newline at end of file + This is because an append-only table is inherently non-idempotent. Restating `INCREMENTAL_UNMANAGED` models may lead to data loss and should be performed with care. diff --git a/docs/concepts/models/overview.md b/docs/concepts/models/overview.md index c14f2977e1..40e8eac63c 100644 --- a/docs/concepts/models/overview.md +++ b/docs/concepts/models/overview.md @@ -227,7 +227,10 @@ Learn more about these properties and their default values in the [model configu : Tags are one or more labels used to organize your models. ### cron -: Cron is used to schedule when your model processes or refreshes data. It accepts a [cron expression](https://en.wikipedia.org/wiki/Cron) or any of `@hourly`, `@daily`, `@weekly`, or `@monthly`. All times are assumed to be UTC timezone - it is not possible to specify them in a different timezone. +: Cron is used to schedule when your model processes or refreshes data. It accepts a [cron expression](https://en.wikipedia.org/wiki/Cron) or any of `@hourly`, `@daily`, `@weekly`, or `@monthly`. All times are assumed to be UTC timezone by default. + +### cron_tz +: Cron timezone is used to specify the timezone of the cron. This is only used for scheduling and does not affect the intervals processed in an incremental model. For example, if a model is `@daily` with cron_tz `America/Los_Angeles`, it will run every day 12AM pacific time, however the `start` and `end` variables passed to the incremental model will represent the UTC date boundaries. ### interval_unit : Interval unit determines the temporal granularity with which time intervals are calculated for the model. diff --git a/sqlmesh/core/audit/definition.py b/sqlmesh/core/audit/definition.py index bff8938069..088e33a9ca 100644 --- a/sqlmesh/core/audit/definition.py +++ b/sqlmesh/core/audit/definition.py @@ -270,6 +270,8 @@ def metadata_hash(self) -> str: *sorted(self.tags), str(self.sorted_python_env), self.stamp, + self.cron, + self.cron_tz.key if self.cron_tz else None, ] query = self.render_audit_query() or self.query diff --git a/sqlmesh/core/model/definition.py b/sqlmesh/core/model/definition.py index 5ff74196eb..9945b61090 100644 --- a/sqlmesh/core/model/definition.py +++ b/sqlmesh/core/model/definition.py @@ -1086,6 +1086,7 @@ def metadata_hash(self) -> str: self.description, json.dumps(self.column_descriptions, sort_keys=True), self.cron, + self.cron_tz.key if self.cron_tz else None, str(self.start) if self.start else None, str(self.end) if self.end else None, str(self.retention) if self.retention else None, diff --git a/sqlmesh/core/node.py b/sqlmesh/core/node.py index 7526fde846..98a24884cd 100644 --- a/sqlmesh/core/node.py +++ b/sqlmesh/core/node.py @@ -1,6 +1,7 @@ from __future__ import annotations import typing as t +import zoneinfo from datetime import datetime from enum import Enum from pathlib import Path @@ -177,6 +178,7 @@ class _Node(PydanticModel): the date from the scheduler will be used cron: A cron string specifying how often the node should be run, leveraging the [croniter](https://github.com/kiorky/croniter) library. + cron_tz: Time zone for the cron, defaults to utc, [IANA time zones](https://docs.python.org/3/library/zoneinfo.html). interval_unit: The duration of an interval for the node. By default, it is computed from the cron expression. tags: A list of tags that can be used to filter nodes. stamp: An optional arbitrary string sequence used to create new node versions without making @@ -190,6 +192,7 @@ class _Node(PydanticModel): start: t.Optional[TimeLike] = None end: t.Optional[TimeLike] = None cron: SQLGlotCron = "@daily" + cron_tz: t.Optional[zoneinfo.ZoneInfo] = None interval_unit_: t.Optional[IntervalUnit] = Field(alias="interval_unit", default=None) tags: t.List[str] = [] stamp: t.Optional[str] = None @@ -226,6 +229,27 @@ def _name_validator(cls, v: t.Any) -> t.Optional[str]: return v.meta["sql"] return str(v) + @field_validator("cron_tz", mode="before") + def _cron_tz_validator(cls, v: t.Any) -> t.Optional[zoneinfo.ZoneInfo]: + if not v or v == "UTC": + return None + + v = str_or_exp_to_str(v) + + try: + return zoneinfo.ZoneInfo(v) + except Exception as e: + available_timezones = zoneinfo.available_timezones() + + if available_timezones: + raise ConfigError(f"{e}. {v} must be in {available_timezones}.") + else: + raise ConfigError( + f"{e}. IANA time zone data is not available on your system. `pip install tzdata` to leverage cron time zones or remove this field which will default to UTC." + ) + + return None + @field_validator("start", "end", mode="before") @classmethod def _date_validator(cls, v: t.Any) -> t.Optional[TimeLike]: @@ -317,9 +341,9 @@ def metadata_hash(self) -> str: def croniter(self, value: TimeLike) -> CroniterCache: if self._croniter is None: - self._croniter = CroniterCache(self.cron, value) + self._croniter = CroniterCache(self.cron, value, tz=self.cron_tz) else: - self._croniter.curr = to_datetime(value) + self._croniter.curr = to_datetime(value, tz=self.cron_tz) return self._croniter def cron_next(self, value: TimeLike, estimate: bool = False) -> datetime: diff --git a/sqlmesh/migrations/v0076_add_cron_tz.py b/sqlmesh/migrations/v0076_add_cron_tz.py new file mode 100644 index 0000000000..cfc393a4a6 --- /dev/null +++ b/sqlmesh/migrations/v0076_add_cron_tz.py @@ -0,0 +1,5 @@ +"""Add 'cron_tz' property to node definition.""" + + +def migrate(state_sync, **kwargs): # type: ignore + pass diff --git a/sqlmesh/utils/cron.py b/sqlmesh/utils/cron.py index 7950f87df2..c6080b1db7 100644 --- a/sqlmesh/utils/cron.py +++ b/sqlmesh/utils/cron.py @@ -1,7 +1,7 @@ from __future__ import annotations import typing as t -from datetime import datetime, timedelta +from datetime import datetime, timedelta, tzinfo from functools import lru_cache from croniter import croniter @@ -34,21 +34,22 @@ def interval_seconds(cron: str) -> int: class CroniterCache: - def __init__(self, cron: str, time: t.Optional[TimeLike] = None): + def __init__(self, cron: str, time: t.Optional[TimeLike] = None, tz: t.Optional[tzinfo] = None): self.cron = cron - self.curr: datetime = to_datetime(now() if time is None else time) + self.tz = tz + self.curr: datetime = to_datetime(now() if time is None else time, tz=self.tz) self.interval_seconds = interval_seconds(self.cron) def get_next(self, estimate: bool = False) -> datetime: if estimate and self.interval_seconds: self.curr = self.curr + timedelta(seconds=self.interval_seconds) else: - self.curr = to_datetime(croniter(self.cron, self.curr).get_next() * 1000) + self.curr = to_datetime(croniter(self.cron, self.curr).get_next() * 1000, tz=self.tz) return self.curr def get_prev(self, estimate: bool = False) -> datetime: if estimate and self.interval_seconds: self.curr = self.curr - timedelta(seconds=self.interval_seconds) else: - self.curr = to_datetime(croniter(self.cron, self.curr).get_prev() * 1000) + self.curr = to_datetime(croniter(self.cron, self.curr).get_prev() * 1000, tz=self.tz) return self.curr diff --git a/sqlmesh/utils/date.py b/sqlmesh/utils/date.py index 783541e42c..3c3a8fcbcc 100644 --- a/sqlmesh/utils/date.py +++ b/sqlmesh/utils/date.py @@ -5,14 +5,13 @@ import typing as t import warnings -from pandas.api.types import is_datetime64_any_dtype # type: ignore - -from datetime import date, datetime, timedelta, timezone +from datetime import date, datetime, timedelta, timezone, tzinfo import dateparser import pandas as pd from dateparser import freshness_date_parser as freshness_date_parser_module from dateparser.freshness_date_parser import freshness_date_parser +from pandas.api.types import is_datetime64_any_dtype # type: ignore from sqlglot import exp from sqlmesh.utils import ttl_cache @@ -149,6 +148,7 @@ def to_datetime( value: TimeLike, relative_base: t.Optional[datetime] = None, check_categorical_relative_expression: bool = True, + tz: t.Optional[tzinfo] = None, ) -> datetime: """Converts a value into a UTC datetime object. @@ -156,12 +156,13 @@ def to_datetime( value: A variety of date formats. If the value is number-like, it is assumed to be millisecond epochs. relative_base: The datetime to reference for time expressions that are using relative terms. check_categorical_relative_expression: If True, takes into account the relative expressions that are categorical. + tz: Timezone to convert datetime to, defaults to utc Raises: ValueError if value cannot be converted to a datetime. Returns: - A datetime object with tz utc. + A datetime object with tz (default UTC). """ if isinstance(value, datetime): dt: t.Optional[datetime] = value @@ -198,9 +199,11 @@ def to_datetime( if dt is None: raise ValueError(f"Could not convert `{value}` to datetime.") + tz = tz or UTC + if dt.tzinfo: - return dt if dt.tzinfo == UTC else dt.astimezone(UTC) - return dt.replace(tzinfo=UTC) + return dt if dt.tzinfo == tz else dt.astimezone(tz) + return dt.replace(tzinfo=tz) def to_date(value: TimeLike, relative_base: t.Optional[datetime] = None) -> date: diff --git a/sqlmesh/utils/pydantic.py b/sqlmesh/utils/pydantic.py index be61abe879..010a5f14ff 100644 --- a/sqlmesh/utils/pydantic.py +++ b/sqlmesh/utils/pydantic.py @@ -2,6 +2,7 @@ import json import typing as t +from datetime import tzinfo import pydantic from pydantic import ValidationInfo as ValidationInfo @@ -72,6 +73,7 @@ class PydanticModel(pydantic.BaseModel): exp.Tuple: _expression_encoder, AuditQueryTypes: _expression_encoder, # type: ignore ModelQueryTypes: _expression_encoder, # type: ignore + tzinfo: lambda tz: tz.key, }, arbitrary_types_allowed=True, extra="forbid", diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 9cd5b27050..52b4471526 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -4884,6 +4884,67 @@ def test_plan_production_environment_statements(tmp_path: Path): assert environment_statements[0].python_env["__sqlmesh__vars__"].payload == "{'var_5': 5}" +@time_machine.travel("2025-03-08 00:00:00 UTC") +def test_tz(init_and_plan_context): + context, _ = init_and_plan_context("examples/sushi") + + model = context.get_model("sushi.waiter_revenue_by_day") + context.upsert_model( + SqlModel.parse_obj( + {**model.dict(), "cron_tz": "America/Los_Angeles", "start": "2025-03-07"} + ) + ) + + def assert_intervals(plan, intervals): + assert ( + next( + intervals.intervals + for intervals in plan.missing_intervals + if intervals.snapshot_id.name == model.fqn + ) + == intervals + ) + + plan = context.plan_builder("prod", skip_tests=True).build() + + # we have missing intervals but not waiter_revenue_by_day because it's not midnight pacific yet + assert plan.missing_intervals + + with pytest.raises(StopIteration): + assert_intervals(plan, []) + + # now we're ready 8AM UTC == midnight PST + with time_machine.travel("2025-03-08 08:00:00 UTC"): + plan = context.plan_builder("prod", skip_tests=True).build() + assert_intervals(plan, [(to_timestamp("2025-03-07"), to_timestamp("2025-03-08"))]) + + with time_machine.travel("2025-03-09 07:00:00 UTC"): + plan = context.plan_builder("prod", skip_tests=True).build() + + assert_intervals( + plan, + [ + (to_timestamp("2025-03-07"), to_timestamp("2025-03-08")), + ], + ) + + with time_machine.travel("2025-03-09 08:00:00 UTC"): + plan = context.plan_builder("prod", skip_tests=True).build() + + assert_intervals( + plan, + [ + (to_timestamp("2025-03-07"), to_timestamp("2025-03-08")), + (to_timestamp("2025-03-08"), to_timestamp("2025-03-09")), + ], + ) + + context.apply(plan) + + plan = context.plan_builder("prod", skip_tests=True).build() + assert not plan.missing_intervals + + def apply_to_environment( context: Context, environment: str, diff --git a/tests/core/test_snapshot.py b/tests/core/test_snapshot.py index 3262b1c73f..16547423fb 100644 --- a/tests/core/test_snapshot.py +++ b/tests/core/test_snapshot.py @@ -860,7 +860,7 @@ def test_fingerprint(model: Model, parent_model: Model): original_fingerprint = SnapshotFingerprint( data_hash="1312415267", - metadata_hash="221611364", + metadata_hash="1125608408", ) assert fingerprint == original_fingerprint @@ -921,7 +921,7 @@ def test_fingerprint_seed_model(): expected_fingerprint = SnapshotFingerprint( data_hash="1909791099", - metadata_hash="3403817841", + metadata_hash="2315134974", ) model = load_sql_based_model(expressions, path=Path("./examples/sushi/models/test_model.sql")) @@ -960,7 +960,7 @@ def test_fingerprint_jinja_macros(model: Model): ) original_fingerprint = SnapshotFingerprint( data_hash="923305614", - metadata_hash="221611364", + metadata_hash="1125608408", ) fingerprint = fingerprint_from_node(model, nodes={}) diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index 1028825bb2..a1446b9b1d 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -3192,8 +3192,8 @@ def insert( def test_custom_materialization_strategy_with_custom_properties(adapter_mock, make_snapshot): custom_insert_kind = None - class TestCustomKind(CustomKind): # type: ignore[no-untyped-def] - _primary_key: t.List[exp.Expression] + class TestCustomKind(CustomKind): + _primary_key: t.List[exp.Expression] # type: ignore[no-untyped-def] @model_validator(mode="after") def _validate_model(self) -> Self: