Skip to content

Commit 235e2c7

Browse files
authored
Enable audits when evaluating snapshots in Airflow. Fixes #35 (#44)
1 parent 595e81f commit 235e2c7

3 files changed

Lines changed: 10 additions & 1 deletion

File tree

sqlmesh/core/scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ def evaluate(
9090
mapping=mapping,
9191
**kwargs,
9292
)
93-
self.state_sync.add_interval(snapshot.snapshot_id, start, end)
9493
self.snapshot_evaluator.audit(
9594
snapshot=snapshot,
9695
start=start,
@@ -99,6 +98,7 @@ def evaluate(
9998
mapping=mapping,
10099
**kwargs,
101100
)
101+
self.state_sync.add_interval(snapshot.snapshot_id, start, end)
102102
self.console.update_snapshot_progress(snapshot.name, 1)
103103

104104
def run(

sqlmesh/core/snapshot_evaluator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ def evaluate(
7676
as a dataframe.
7777
kwargs: Additional kwargs to pass to the renderer.
7878
"""
79+
logger.info("Evaluating snapshot %s", snapshot.snapshot_id)
7980
if snapshot.is_embedded_kind:
8081
return None
8182

@@ -220,6 +221,7 @@ def audit(
220221
collection_exceptions:
221222
kwargs: Additional kwargs to pass to the renderer.
222223
"""
224+
logger.info("Auditing snapshot %s", snapshot.snapshot_id)
223225
results = []
224226
for audit, query in snapshot.model.render_audit_queries(
225227
start=start,

sqlmesh/engines/commands.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ def evaluate(
5656
command_payload.latest,
5757
mapping=command_payload.table_mapping,
5858
)
59+
evaluator.audit(
60+
snapshot=command_payload.snapshot,
61+
start=command_payload.start,
62+
end=command_payload.end,
63+
latest=command_payload.latest,
64+
mapping=command_payload.table_mapping,
65+
)
5966

6067

6168
def promote(

0 commit comments

Comments
 (0)