Skip to content

Commit 239863c

Browse files
authored
Feat!: Fully Qualify Models and Tables with Connection Catalog (#1574)
* feat!: use connection as source for catalog * fmt * remove integration test * update docs
1 parent a8c259d commit 239863c

141 files changed

Lines changed: 3822 additions & 1720 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ repos:
3636
"types": [python]
3737
files: *files
3838
require_serial: true
39+
exclude: ^(tests/fixtures)
3940
- repo: https://github.com/pre-commit/mirrors-prettier
4041
rev: "fc26039"
4142
hooks:

docs/concepts/models/python_models.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ from sqlmesh import ExecutionContext, model
202202
def execute(
203203
context: ExecutionContext,
204204
start: datetime,
205-
end: datetime
205+
end: datetime,
206206
execution_time: datetime,
207207
**kwargs: t.Any,
208208
) -> DataFrame:

docs/reference/model_configuration.md

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,6 @@ The SQLMesh project-level `model_defaults` key supports the following options, d
4646

4747
The project-level `model_defaults` key also supports two keys for specifying a default catalog or schema, described [below](#default-schema--catalog).
4848

49-
#### Default Schema / Catalog
50-
SQLMesh model names may contain one to three levels of nesting, where the naming hierarchy is `catalog.schema.model`.
51-
52-
SQLMesh requires all model names and references to have the same level of nesting. For example, if you decide to specify a catalog in any model name, all model names and references must include a catalog.
53-
54-
You can specify default schema and catalog names in the `model_defaults` key if you want to omit them from some model names.
55-
56-
| Option | Description | Type | Required |
57-
| --------- | ------------------------------------------------------------------ | :--: | :------: |
58-
| `catalog` | The default catalog name for models that do not specify a catalog. | str | N |
59-
| `schema` | The default schema name for models that do not specify a schema. | str | N |
60-
6149
## Model kind properties
6250

6351
Configuration options for kind-specific SQLMesh model properties.

examples/sushi/models/waiters.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,17 @@ def entrypoint(evaluator: MacroEvaluator) -> exp.Select:
3636

3737
parent_snapshots = snapshot.parents
3838
assert len(parent_snapshots) == 1
39-
assert parent_snapshots[0].name.lower() == "sushi.orders"
39+
name = '"sushi"."orders"'
40+
# There are tests which force not having a default catalog so we check here if one is defined
41+
# and add it to the name if it is
42+
default_catalog = evaluator.default_catalog
43+
if default_catalog:
44+
# make sure we don't double quote the default catalog
45+
default_catalog = default_catalog.strip('"')
46+
name = ".".join([f'"{default_catalog}"', name])
47+
assert (
48+
parent_snapshots[0].name == name
49+
), f"Snapshot Name: {parent_snapshots[0].name}, Name: {name}"
4050

4151
excluded = {"id", "customer_id", "start_ts", "end_ts"}
4252
projections = []

setup.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@
6464
"google-cloud-bigquery",
6565
"google-cloud-bigquery-storage",
6666
"black==22.6.0",
67+
"dbt-bigquery; python_version >= '3.8'",
68+
# Remove once we drop support for Python 3.7
69+
"dbt-bigquery==1.5.2; python_version < '3.8'",
6770
"dbt-core",
6871
"dbt-duckdb>=1.4.2",
6972
"dbt-snowflake",

sqlmesh/core/audit/definition.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@
1212
from sqlmesh.core import constants as c
1313
from sqlmesh.core import dialect as d
1414
from sqlmesh.core.macros import MacroRegistry, macro
15-
from sqlmesh.core.model.common import bool_validator, expression_validator
15+
from sqlmesh.core.model.common import (
16+
bool_validator,
17+
default_catalog_validator,
18+
depends_on_validator,
19+
expression_validator,
20+
)
1621
from sqlmesh.core.model.definition import _Model, _python_env, _single_value_or_tuple
1722
from sqlmesh.core.node import _Node
1823
from sqlmesh.core.renderer import QueryRenderer
@@ -195,7 +200,7 @@ def render_query(
195200

196201
node = snapshot_or_node if isinstance(snapshot_or_node, _Node) else snapshot_or_node.node
197202
this_model = kwargs.pop("this_model", None) or (
198-
node.name
203+
node.fqn
199204
if isinstance(snapshot_or_node, _Node)
200205
else t.cast(Snapshot, snapshot_or_node).table_name(
201206
deployability_index.is_deployable(snapshot_or_node)
@@ -248,6 +253,7 @@ def _create_query_renderer(self, node: _Node) -> QueryRenderer:
248253
jinja_macro_registry=self.jinja_macros,
249254
python_env=model.python_env,
250255
only_execution_time=model.kind.only_execution_time,
256+
default_catalog=model.default_catalog,
251257
)
252258

253259
def __str__(self) -> str:
@@ -272,6 +278,7 @@ class StandaloneAudit(_Node, AuditMixin):
272278
defaults: t.Dict[str, exp.Expression] = {}
273279
expressions_: t.Optional[t.List[exp.Expression]] = Field(default=None, alias="expressions")
274280
jinja_macros: JinjaMacroRegistry = JinjaMacroRegistry()
281+
default_catalog: t.Optional[str] = None
275282
depends_on_: t.Optional[t.Set[str]] = Field(default=None, alias="depends_on")
276283
hash_raw_query: bool = False
277284
python_env_: t.Optional[t.Dict[str, Executable]] = Field(default=None, alias="python_env")
@@ -285,6 +292,8 @@ class StandaloneAudit(_Node, AuditMixin):
285292
_bool_validator = bool_validator
286293
_string_validator = audit_string_validator
287294
_map_validator = audit_map_validator
295+
_default_catalog_validator = default_catalog_validator
296+
_depends_on_validator = depends_on_validator
288297

289298
@model_validator(mode="after")
290299
@model_validator_v1_args
@@ -301,7 +310,9 @@ def depends_on(self) -> t.Set[str]:
301310

302311
query = self.render_query(self)
303312
if query is not None:
304-
self._depends_on |= d.find_tables(query, dialect=self.dialect)
313+
self._depends_on |= d.find_tables(
314+
query, default_catalog=self.default_catalog, dialect=self.dialect
315+
)
305316

306317
self._depends_on -= {self.name}
307318
return self._depends_on
@@ -427,6 +438,7 @@ def _create_query_renderer(self, node: _Node) -> QueryRenderer:
427438
path=self._path or Path(),
428439
jinja_macro_registry=self.jinja_macros,
429440
python_env=self.python_env,
441+
default_catalog=self.default_catalog,
430442
)
431443

432444

@@ -453,6 +465,7 @@ def load_audit(
453465
macros: t.Optional[MacroRegistry] = None,
454466
jinja_macros: t.Optional[JinjaMacroRegistry] = None,
455467
dialect: t.Optional[str] = None,
468+
default_catalog: t.Optional[str] = None,
456469
) -> Audit:
457470
"""Load an audit from a parsed SQLMesh audit file.
458471
@@ -523,11 +536,16 @@ def load_audit(
523536
extra_kwargs["jinja_macros"] = (jinja_macros or JinjaMacroRegistry()).trim(
524537
jinja_macro_refrences
525538
)
539+
extra_kwargs["default_catalog"] = default_catalog
526540

527541
dialect = meta_fields.pop("dialect", dialect)
528542
try:
529543
audit = audit_class(
530-
query=query, expressions=statements, dialect=dialect, **meta_fields, **extra_kwargs
544+
query=query,
545+
expressions=statements,
546+
dialect=dialect,
547+
**extra_kwargs,
548+
**meta_fields,
531549
)
532550
except Exception as ex:
533551
_raise_config_error(str(ex), path)
@@ -544,6 +562,7 @@ def load_multiple_audits(
544562
macros: t.Optional[MacroRegistry] = None,
545563
jinja_macros: t.Optional[JinjaMacroRegistry] = None,
546564
dialect: t.Optional[str] = None,
565+
default_catalog: t.Optional[str] = None,
547566
) -> t.Generator[Audit, None, None]:
548567
audit_block: t.List[exp.Expression] = []
549568
for expression in expressions:
@@ -556,13 +575,15 @@ def load_multiple_audits(
556575
macros=macros,
557576
jinja_macros=jinja_macros,
558577
dialect=dialect,
578+
default_catalog=default_catalog,
559579
)
560580
audit_block.clear()
561581
audit_block.append(expression)
562582
yield load_audit(
563583
expressions=audit_block,
564584
path=path,
565585
dialect=dialect,
586+
default_catalog=default_catalog,
566587
)
567588

568589

@@ -586,4 +607,5 @@ def _maybe_parse_arg_pair(e: exp.Expression) -> t.Tuple[str, exp.Expression]:
586607
"depends_on_": lambda value: exp.Tuple(expressions=sorted(value)),
587608
"tags": _single_value_or_tuple,
588609
"hash_raw_query": exp.convert,
610+
"default_catalog": exp.to_identifier,
589611
}

sqlmesh/core/config/connection.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,9 @@ def _connection_factory(self) -> t.Callable:
542542

543543
return connect
544544

545+
def get_catalog(self) -> t.Optional[str]:
546+
return self.project
547+
545548

546549
class GCPPostgresConnectionConfig(ConnectionConfig):
547550
"""

sqlmesh/core/config/model.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
import typing as t
44

5-
from pydantic import Field
6-
75
from sqlmesh.core.config.base import BaseConfig
86
from sqlmesh.core.model.kind import ModelKind, model_kind_validator
97
from sqlmesh.utils.date import TimeLike
@@ -26,8 +24,6 @@ class ModelDefaultsConfig(BaseConfig):
2624
will be chunked such that each individual job will only contain jobs with max `batch_size` intervals.
2725
storage_format: The storage format used to store the physical table, only applicable in certain engines.
2826
(eg. 'parquet')
29-
catalog: The default catalog to use if one is not provided (catalog.schema.table).
30-
schema: The default schema to use if one is not provided. (catalog.schema.table).
3127
"""
3228

3329
kind: t.Optional[ModelKind] = None
@@ -37,7 +33,5 @@ class ModelDefaultsConfig(BaseConfig):
3733
start: t.Optional[TimeLike] = None
3834
batch_size: t.Optional[int] = None
3935
storage_format: t.Optional[str] = None
40-
catalog: t.Optional[str] = None
41-
schema_: t.Optional[str] = Field(default=None, alias="schema")
4236

4337
_model_kind_validator = model_kind_validator

sqlmesh/core/config/scheduler.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,14 @@ def create_state_sync(self, context: Context) -> StateSync:
5454
The StateSync instance.
5555
"""
5656

57+
@abc.abstractmethod
58+
def get_default_catalog(self, context: Context) -> t.Optional[str]:
59+
"""Returns the default catalog for the Scheduler.
60+
61+
Args:
62+
context: The SQLMesh Context.
63+
"""
64+
5765

5866
class _EngineAdapterStateSyncSchedulerConfig(_SchedulerConfig):
5967
def create_state_sync(self, context: Context) -> StateSync:
@@ -69,6 +77,9 @@ def create_state_sync(self, context: Context) -> StateSync:
6977
schema = context.config.get_state_schema(context.gateway)
7078
return EngineAdapterStateSync(engine_adapter, schema=schema, console=context.console)
7179

80+
def get_default_catalog(self, context: Context) -> t.Optional[str]:
81+
return context.engine_adapter.default_catalog
82+
7283

7384
class BuiltInSchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, BaseConfig):
7485
"""The Built-In Scheduler configuration."""
@@ -79,6 +90,7 @@ def create_plan_evaluator(self, context: Context) -> PlanEvaluator:
7990
return BuiltInPlanEvaluator(
8091
state_sync=context.state_sync,
8192
snapshot_evaluator=context.snapshot_evaluator,
93+
default_catalog=self.get_default_catalog(context),
8294
backfill_concurrent_tasks=context.concurrent_tasks,
8395
console=context.console,
8496
notification_target_manager=context.notification_target_manager,
@@ -125,6 +137,9 @@ def create_plan_evaluator(self, context: Context) -> PlanEvaluator:
125137
state_sync=context.state_sync if self.use_state_connection else None,
126138
)
127139

140+
def get_default_catalog(self, context: Context) -> t.Optional[str]:
141+
return None
142+
128143

129144
class AirflowSchedulerConfig(_BaseAirflowSchedulerConfig, BaseConfig):
130145
"""The Airflow Scheduler configuration.
@@ -280,6 +295,9 @@ def create_plan_evaluator(self, context: Context) -> PlanEvaluator:
280295
users=context.users,
281296
)
282297

298+
def get_default_catalog(self, context: Context) -> t.Optional[str]:
299+
return None
300+
283301

284302
SchedulerConfig = Annotated[
285303
t.Union[

0 commit comments

Comments
 (0)