Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/concepts/models/model_kinds.md
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ In addition to specifying a time column in the `MODEL` DDL, the model's query mu

A model's `time_column` should be in the [UTC time zone](https://en.wikipedia.org/wiki/Coordinated_Universal_Time) to ensure correct interaction with SQLMesh's scheduler and predefined macro variables.

This requirement aligns with the data engineering best practice of converting datetime/timestamp columns to UTC as soon as they are ingested into the data system and only converting them to local timezones when they exit the system for downstream uses.
This requirement aligns with the data engineering best practice of converting datetime/timestamp columns to UTC as soon as they are ingested into the data system and only converting them to local timezones when they exit the system for downstream uses. The `cron_tz` flag **does not** change this requirement.

Placing all timezone conversion code in the system's first/last transformation models prevents inadvertent timezone-related errors as data flows between models.

Expand Down Expand Up @@ -1598,4 +1598,4 @@ Since it's unmanaged, it doesnt support the `batch_size` and `batch_concurrency`

Similar to `INCREMENTAL_BY_PARTITION`, attempting to [restate](../plans.md#restatement-plans) an `INCREMENTAL_UNMANAGED` model will trigger a full restatement. That is, the model will be rebuilt from scratch rather than from a time slice you specify.

This is because an append-only table is inherently non-idempotent. Restating `INCREMENTAL_UNMANAGED` models may lead to data loss and should be performed with care.
This is because an append-only table is inherently non-idempotent. Restating `INCREMENTAL_UNMANAGED` models may lead to data loss and should be performed with care.
5 changes: 4 additions & 1 deletion docs/concepts/models/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ Learn more about these properties and their default values in the [model configu
: Tags are one or more labels used to organize your models.

### cron
: Cron is used to schedule when your model processes or refreshes data. It accepts a [cron expression](https://en.wikipedia.org/wiki/Cron) or any of `@hourly`, `@daily`, `@weekly`, or `@monthly`. All times are assumed to be UTC timezone - it is not possible to specify them in a different timezone.
: Cron is used to schedule when your model processes or refreshes data. It accepts a [cron expression](https://en.wikipedia.org/wiki/Cron) or any of `@hourly`, `@daily`, `@weekly`, or `@monthly`. All times are assumed to be UTC timezone by default.

### cron_tz
: Cron timezone is used to specify the timezone of the cron. This is only used for scheduling and does not affect the intervals processed in an incremental model. For example, if a model is `@daily` with cron_tz `America/Los_Angeles`, it will run every day 12AM pacific time, however the `start` and `end` variables passed to the incremental model will represent the UTC date boundaries.

### interval_unit
: Interval unit determines the temporal granularity with which time intervals are calculated for the model.
Expand Down
2 changes: 2 additions & 0 deletions sqlmesh/core/audit/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ def metadata_hash(self) -> str:
*sorted(self.tags),
str(self.sorted_python_env),
self.stamp,
self.cron,
self.cron_tz.key if self.cron_tz else None,
]

query = self.render_audit_query() or self.query
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/model/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,7 @@ def metadata_hash(self) -> str:
self.description,
json.dumps(self.column_descriptions, sort_keys=True),
self.cron,
self.cron_tz.key if self.cron_tz else None,
str(self.start) if self.start else None,
str(self.end) if self.end else None,
str(self.retention) if self.retention else None,
Expand Down
28 changes: 26 additions & 2 deletions sqlmesh/core/node.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import typing as t
import zoneinfo
from datetime import datetime
from enum import Enum
from pathlib import Path
Expand Down Expand Up @@ -177,6 +178,7 @@ class _Node(PydanticModel):
the date from the scheduler will be used
cron: A cron string specifying how often the node should be run, leveraging the
[croniter](https://github.com/kiorky/croniter) library.
cron_tz: Time zone for the cron, defaults to utc, [IANA time zones](https://docs.python.org/3/library/zoneinfo.html).
interval_unit: The duration of an interval for the node. By default, it is computed from the cron expression.
tags: A list of tags that can be used to filter nodes.
stamp: An optional arbitrary string sequence used to create new node versions without making
Expand All @@ -190,6 +192,7 @@ class _Node(PydanticModel):
start: t.Optional[TimeLike] = None
end: t.Optional[TimeLike] = None
cron: SQLGlotCron = "@daily"
cron_tz: t.Optional[zoneinfo.ZoneInfo] = None
Comment thread
tobymao marked this conversation as resolved.
interval_unit_: t.Optional[IntervalUnit] = Field(alias="interval_unit", default=None)
tags: t.List[str] = []
stamp: t.Optional[str] = None
Expand Down Expand Up @@ -226,6 +229,27 @@ def _name_validator(cls, v: t.Any) -> t.Optional[str]:
return v.meta["sql"]
return str(v)

@field_validator("cron_tz", mode="before")
def _cron_tz_validator(cls, v: t.Any) -> t.Optional[zoneinfo.ZoneInfo]:
if not v or v == "UTC":
return None

v = str_or_exp_to_str(v)

try:
return zoneinfo.ZoneInfo(v)
except Exception as e:
available_timezones = zoneinfo.available_timezones()

if available_timezones:
raise ConfigError(f"{e}. {v} must be in {available_timezones}.")
else:
raise ConfigError(
f"{e}. IANA time zone data is not available on your system. `pip install tzdata` to leverage cron time zones or remove this field which will default to UTC."
)

return None

@field_validator("start", "end", mode="before")
@classmethod
def _date_validator(cls, v: t.Any) -> t.Optional[TimeLike]:
Expand Down Expand Up @@ -317,9 +341,9 @@ def metadata_hash(self) -> str:

def croniter(self, value: TimeLike) -> CroniterCache:
if self._croniter is None:
self._croniter = CroniterCache(self.cron, value)
self._croniter = CroniterCache(self.cron, value, tz=self.cron_tz)
else:
self._croniter.curr = to_datetime(value)
self._croniter.curr = to_datetime(value, tz=self.cron_tz)
return self._croniter

def cron_next(self, value: TimeLike, estimate: bool = False) -> datetime:
Expand Down
5 changes: 5 additions & 0 deletions sqlmesh/migrations/v0076_add_cron_tz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Add 'cron_tz' property to node definition."""


def migrate(state_sync, **kwargs): # type: ignore
pass
11 changes: 6 additions & 5 deletions sqlmesh/utils/cron.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import typing as t
from datetime import datetime, timedelta
from datetime import datetime, timedelta, tzinfo
from functools import lru_cache

from croniter import croniter
Expand Down Expand Up @@ -34,21 +34,22 @@ def interval_seconds(cron: str) -> int:


class CroniterCache:
def __init__(self, cron: str, time: t.Optional[TimeLike] = None):
def __init__(self, cron: str, time: t.Optional[TimeLike] = None, tz: t.Optional[tzinfo] = None):
self.cron = cron
self.curr: datetime = to_datetime(now() if time is None else time)
self.tz = tz
self.curr: datetime = to_datetime(now() if time is None else time, tz=self.tz)
self.interval_seconds = interval_seconds(self.cron)

def get_next(self, estimate: bool = False) -> datetime:
if estimate and self.interval_seconds:
self.curr = self.curr + timedelta(seconds=self.interval_seconds)
else:
self.curr = to_datetime(croniter(self.cron, self.curr).get_next() * 1000)
self.curr = to_datetime(croniter(self.cron, self.curr).get_next() * 1000, tz=self.tz)
return self.curr

def get_prev(self, estimate: bool = False) -> datetime:
if estimate and self.interval_seconds:
self.curr = self.curr - timedelta(seconds=self.interval_seconds)
else:
self.curr = to_datetime(croniter(self.cron, self.curr).get_prev() * 1000)
self.curr = to_datetime(croniter(self.cron, self.curr).get_prev() * 1000, tz=self.tz)
return self.curr
15 changes: 9 additions & 6 deletions sqlmesh/utils/date.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@
import typing as t
import warnings

from pandas.api.types import is_datetime64_any_dtype # type: ignore

from datetime import date, datetime, timedelta, timezone
from datetime import date, datetime, timedelta, timezone, tzinfo

import dateparser
import pandas as pd
from dateparser import freshness_date_parser as freshness_date_parser_module
from dateparser.freshness_date_parser import freshness_date_parser
from pandas.api.types import is_datetime64_any_dtype # type: ignore
from sqlglot import exp

from sqlmesh.utils import ttl_cache
Expand Down Expand Up @@ -149,19 +148,21 @@ def to_datetime(
value: TimeLike,
relative_base: t.Optional[datetime] = None,
check_categorical_relative_expression: bool = True,
tz: t.Optional[tzinfo] = None,
) -> datetime:
"""Converts a value into a UTC datetime object.

Args:
value: A variety of date formats. If the value is number-like, it is assumed to be millisecond epochs.
relative_base: The datetime to reference for time expressions that are using relative terms.
check_categorical_relative_expression: If True, takes into account the relative expressions that are categorical.
tz: Timezone to convert datetime to, defaults to utc

Raises:
ValueError if value cannot be converted to a datetime.

Returns:
A datetime object with tz utc.
A datetime object with tz (default UTC).
"""
if isinstance(value, datetime):
dt: t.Optional[datetime] = value
Expand Down Expand Up @@ -198,9 +199,11 @@ def to_datetime(
if dt is None:
raise ValueError(f"Could not convert `{value}` to datetime.")

tz = tz or UTC

if dt.tzinfo:
return dt if dt.tzinfo == UTC else dt.astimezone(UTC)
return dt.replace(tzinfo=UTC)
return dt if dt.tzinfo == tz else dt.astimezone(tz)
return dt.replace(tzinfo=tz)


def to_date(value: TimeLike, relative_base: t.Optional[datetime] = None) -> date:
Expand Down
2 changes: 2 additions & 0 deletions sqlmesh/utils/pydantic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import typing as t
from datetime import tzinfo

import pydantic
from pydantic import ValidationInfo as ValidationInfo
Expand Down Expand Up @@ -72,6 +73,7 @@ class PydanticModel(pydantic.BaseModel):
exp.Tuple: _expression_encoder,
AuditQueryTypes: _expression_encoder, # type: ignore
ModelQueryTypes: _expression_encoder, # type: ignore
tzinfo: lambda tz: tz.key,
Comment thread
tobymao marked this conversation as resolved.
},
arbitrary_types_allowed=True,
extra="forbid",
Expand Down
61 changes: 61 additions & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4884,6 +4884,67 @@ def test_plan_production_environment_statements(tmp_path: Path):
assert environment_statements[0].python_env["__sqlmesh__vars__"].payload == "{'var_5': 5}"


@time_machine.travel("2025-03-08 00:00:00 UTC")
def test_tz(init_and_plan_context):
context, _ = init_and_plan_context("examples/sushi")

model = context.get_model("sushi.waiter_revenue_by_day")
context.upsert_model(
SqlModel.parse_obj(
{**model.dict(), "cron_tz": "America/Los_Angeles", "start": "2025-03-07"}
)
)

def assert_intervals(plan, intervals):
assert (
next(
intervals.intervals
for intervals in plan.missing_intervals
if intervals.snapshot_id.name == model.fqn
)
== intervals
)

plan = context.plan_builder("prod", skip_tests=True).build()

# we have missing intervals but not waiter_revenue_by_day because it's not midnight pacific yet
assert plan.missing_intervals

with pytest.raises(StopIteration):
assert_intervals(plan, [])

# now we're ready 8AM UTC == midnight PST
with time_machine.travel("2025-03-08 08:00:00 UTC"):
plan = context.plan_builder("prod", skip_tests=True).build()
assert_intervals(plan, [(to_timestamp("2025-03-07"), to_timestamp("2025-03-08"))])

with time_machine.travel("2025-03-09 07:00:00 UTC"):
plan = context.plan_builder("prod", skip_tests=True).build()

assert_intervals(
plan,
[
(to_timestamp("2025-03-07"), to_timestamp("2025-03-08")),
],
)

with time_machine.travel("2025-03-09 08:00:00 UTC"):
plan = context.plan_builder("prod", skip_tests=True).build()

assert_intervals(
plan,
[
(to_timestamp("2025-03-07"), to_timestamp("2025-03-08")),
(to_timestamp("2025-03-08"), to_timestamp("2025-03-09")),
],
)

context.apply(plan)

plan = context.plan_builder("prod", skip_tests=True).build()
assert not plan.missing_intervals


def apply_to_environment(
context: Context,
environment: str,
Expand Down
6 changes: 3 additions & 3 deletions tests/core/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ def test_fingerprint(model: Model, parent_model: Model):

original_fingerprint = SnapshotFingerprint(
data_hash="1312415267",
metadata_hash="221611364",
metadata_hash="1125608408",
)

assert fingerprint == original_fingerprint
Expand Down Expand Up @@ -921,7 +921,7 @@ def test_fingerprint_seed_model():

expected_fingerprint = SnapshotFingerprint(
data_hash="1909791099",
metadata_hash="3403817841",
metadata_hash="2315134974",
)

model = load_sql_based_model(expressions, path=Path("./examples/sushi/models/test_model.sql"))
Expand Down Expand Up @@ -960,7 +960,7 @@ def test_fingerprint_jinja_macros(model: Model):
)
original_fingerprint = SnapshotFingerprint(
data_hash="923305614",
metadata_hash="221611364",
metadata_hash="1125608408",
)

fingerprint = fingerprint_from_node(model, nodes={})
Expand Down
4 changes: 2 additions & 2 deletions tests/core/test_snapshot_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3192,8 +3192,8 @@ def insert(
def test_custom_materialization_strategy_with_custom_properties(adapter_mock, make_snapshot):
custom_insert_kind = None

class TestCustomKind(CustomKind): # type: ignore[no-untyped-def]
_primary_key: t.List[exp.Expression]
class TestCustomKind(CustomKind):
_primary_key: t.List[exp.Expression] # type: ignore[no-untyped-def]

@model_validator(mode="after")
def _validate_model(self) -> Self:
Expand Down