Skip to content

Commit df611d1

Browse files
committed
Add configuration parameters for the number of concurrent tasks used for model evaluation and backfilling
1 parent 3fee692 commit df611d1

13 files changed

Lines changed: 67 additions & 14 deletions

File tree

example/config.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,24 @@
3030

3131

3232
# A config that uses Airflow + Spark.
33+
DEFAULT_AIRFLOW_KWARGS = {
34+
**DEFAULT_KWARGS,
35+
"backfill_concurrent_tasks": 4,
36+
"ddl_concurrent_tasks": 4,
37+
}
38+
39+
3340
airflow_config = Config(
3441
**{
35-
**DEFAULT_KWARGS,
42+
**DEFAULT_AIRFLOW_KWARGS,
3643
"scheduler_backend": AirflowSchedulerBackend(),
3744
}
3845
)
3946

4047

4148
airflow_config_docker = Config(
4249
**{
43-
**DEFAULT_KWARGS,
50+
**DEFAULT_AIRFLOW_KWARGS,
4451
"scheduler_backend": AirflowSchedulerBackend(
4552
airflow_url="http://airflow-webserver:8080/"
4653
),

sqlmesh/core/config.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ def create_plan_evaluator(self, context: Context) -> PlanEvaluator:
173173
return BuiltInPlanEvaluator(
174174
state_sync=context.state_sync,
175175
snapshot_evaluator=context.snapshot_evaluator,
176+
backfill_concurrent_tasks=context.backfill_concurrent_tasks,
176177
console=context.console,
177178
)
178179

@@ -218,6 +219,7 @@ def create_plan_evaluator(self, context: Context) -> PlanEvaluator:
218219
dag_creation_max_retry_attempts=self.dag_creation_max_retry_attempts,
219220
console=context.console,
220221
notification_targets=context.notification_targets,
222+
backfill_concurrent_tasks=context.backfill_concurrent_tasks,
221223
ddl_concurrent_tasks=context.ddl_concurrent_tasks,
222224
)
223225

@@ -267,8 +269,12 @@ class Config(PydanticModel):
267269
snapshot_ttl: Duration before unpromoted snapshots are removed.
268270
time_column_format: The default format to use for all model time columns. Defaults to %Y-%m-%d.
269271
This time format uses python format codes. https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes.
272+
backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during
273+
plan application. Default: 1.
270274
ddl_concurrent_task: The number of concurrent tasks used for DDL
271275
operations (table / view creation, deletion, etc). Default: 1.
276+
evaluation_concurrent_tasks: The number of concurrent tasks used for model evaluation when
277+
running with the built-in scheduler. Default: 1.
272278
"""
273279

274280
engine_connection_factory: t.Callable[[], t.Any] = duckdb.connect
@@ -280,7 +286,9 @@ class Config(PydanticModel):
280286
snapshot_ttl: str = ""
281287
ignore_patterns: t.List[str] = []
282288
time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT
289+
backfill_concurrent_tasks: int = 1
283290
ddl_concurrent_tasks: int = 1
291+
evaluation_concurrent_tasks: int = 1
284292

285293
class Config:
286294
arbitrary_types_allowed = True

sqlmesh/core/context.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,12 @@ class Context(BaseContext):
149149
physical_schema: The schema used to store physical materialized tables.
150150
snapshot_ttl: Duration before unpromoted snapshots are removed.
151151
path: The directory containing SQLMesh files.
152+
backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during
153+
plan application. Default: 1.
152154
ddl_concurrent_task: The number of concurrent tasks used for DDL
153155
operations (table / view creation, deletion, etc). Default: 1.
156+
evaluation_concurrent_tasks: The number of concurrent tasks used for model evaluation when
157+
running with the built-in scheduler. Default: 1.
154158
config: A Config object or the name of a Config object in config.py.
155159
test_config: A Config object or name of a Config object in config.py to use for testing only
156160
load: Whether or not to automatically load all models and macros (default True).
@@ -166,7 +170,9 @@ def __init__(
166170
physical_schema: str = "",
167171
snapshot_ttl: str = "",
168172
path: str = "",
173+
backfill_concurrent_tasks: t.Optional[int] = None,
169174
ddl_concurrent_tasks: t.Optional[int] = None,
175+
evaluation_concurrent_tasks: t.Optional[int] = None,
170176
config: t.Optional[t.Union[Config, str]] = None,
171177
test_config: t.Optional[t.Union[Config, str]] = None,
172178
load: bool = True,
@@ -200,20 +206,33 @@ def __init__(
200206
self.macros = UniqueKeyDict("macros")
201207
self.dag: DAG[str] = DAG()
202208

209+
self.backfill_concurrent_tasks = (
210+
backfill_concurrent_tasks or self.config.backfill_concurrent_tasks
211+
)
203212
self.ddl_concurrent_tasks = (
204213
ddl_concurrent_tasks or self.config.ddl_concurrent_tasks
205214
)
215+
self.evaluation_concurrent_tasks = (
216+
evaluation_concurrent_tasks or self.config.evaluation_concurrent_tasks
217+
)
218+
self.is_multithreaded = (
219+
max(
220+
self.backfill_concurrent_tasks,
221+
self.ddl_concurrent_tasks,
222+
self.evaluation_concurrent_tasks,
223+
)
224+
> 1
225+
)
206226

207227
self._engine_adapter = engine_adapter or EngineAdapter(
208228
self.config.engine_connection_factory,
209229
self.config.engine_dialect,
210-
multithreaded=self.ddl_concurrent_tasks > 1,
230+
multithreaded=self.is_multithreaded,
211231
)
212232
self.test_engine_adapter = (
213233
EngineAdapter(
214234
self.test_config.engine_connection_factory,
215235
self.test_config.engine_dialect,
216-
multithreaded=self.test_config.ddl_concurrent_tasks > 1,
217236
)
218237
if self.test_config
219238
else None
@@ -270,6 +289,7 @@ def scheduler(self) -> Scheduler:
270289
self.snapshots,
271290
self.snapshot_evaluator,
272291
self.state_sync,
292+
max_workers=self.evaluation_concurrent_tasks,
273293
console=self.console,
274294
)
275295

sqlmesh/core/plan_evaluator.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,12 @@ def __init__(
5050
self,
5151
state_sync: StateSync,
5252
snapshot_evaluator: SnapshotEvaluator,
53+
backfill_concurrent_tasks: int = 1,
5354
console: t.Optional[Console] = None,
5455
):
5556
self.state_sync = state_sync
5657
self.snapshot_evaluator = snapshot_evaluator
58+
self.backfill_concurrent_tasks = backfill_concurrent_tasks
5759
self.console = console or get_console()
5860

5961
def evaluate(self, plan: Plan) -> None:
@@ -75,6 +77,7 @@ def evaluate(self, plan: Plan) -> None:
7577
{snapshot.name: snapshot for snapshot in snapshots},
7678
self.snapshot_evaluator,
7779
self.state_sync,
80+
max_workers=self.backfill_concurrent_tasks,
7881
console=self.console,
7982
)
8083
scheduler.run(snapshots, plan.start, plan.end)
@@ -138,6 +141,7 @@ def __init__(
138141
dag_creation_poll_interval_secs: int = 30,
139142
dag_creation_max_retry_attempts: int = 10,
140143
notification_targets: t.Optional[t.List[NotificationTarget]] = None,
144+
backfill_concurrent_tasks: int = 1,
141145
ddl_concurrent_tasks: int = 1,
142146
):
143147
self.airflow_client = airflow_client
@@ -147,6 +151,7 @@ def __init__(
147151
self.dag_creation_max_retry_attempts = dag_creation_max_retry_attempts
148152
self.console = console or get_console()
149153
self.notification_targets = notification_targets or []
154+
self.backfill_concurrent_tasks = backfill_concurrent_tasks
150155
self.ddl_concurrent_tasks = ddl_concurrent_tasks
151156

152157
def evaluate(self, plan: Plan) -> None:
@@ -161,6 +166,7 @@ def evaluate(self, plan: Plan) -> None:
161166
no_gaps=plan.no_gaps,
162167
restatements=plan.restatements,
163168
notification_targets=self.notification_targets,
169+
backfill_concurrent_tasks=self.backfill_concurrent_tasks,
164170
ddl_concurrent_tasks=self.ddl_concurrent_tasks,
165171
)
166172

sqlmesh/engines/spark/app.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,14 @@ def main() -> None:
3232
if not command_handler:
3333
raise NotSupportedError(f"Command '{command_type.value}' not supported")
3434

35-
ddl_concurrent_tasks = int(sys.argv[2]) if len(sys.argv) > 2 else 1
36-
3735
spark = create_spark_session()
38-
connection = spark_session_db.connection(spark)
36+
37+
ddl_concurrent_tasks = int(sys.argv[2]) if len(sys.argv) > 2 else 1
3938
evaluator = SnapshotEvaluator(
4039
EngineAdapter(
41-
lambda: connection, "spark", multithreaded=ddl_concurrent_tasks > 1
40+
lambda: spark_session_db.connection(spark),
41+
"spark",
42+
multithreaded=ddl_concurrent_tasks > 1,
4243
),
4344
ddl_concurrent_tasks=ddl_concurrent_tasks,
4445
)

sqlmesh/engines/spark/db_api/spark_session.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,6 @@ def execute(self, query: str, parameters: t.Optional[t.Any] = None) -> None:
1818
if parameters:
1919
raise NotSupportedError("Parameterized queries are not supported")
2020

21-
self._spark.sparkContext.setLocalProperty(
22-
"spark.scheduler.pool", f"pool_{get_ident()}"
23-
)
24-
2521
self._last_df = self._spark.sql(query)
2622
self._last_output = None
2723
self._last_output_cursor = 0
@@ -71,6 +67,9 @@ def __init__(self, spark: SparkSession):
7167
self.spark = spark
7268

7369
def cursor(self) -> SparkSessionCursor:
70+
self.spark.sparkContext.setLocalProperty(
71+
"spark.scheduler.pool", f"pool_{get_ident()}"
72+
)
7473
return SparkSessionCursor(self.spark)
7574

7675
def commit(self) -> None:

sqlmesh/schedulers/airflow/client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ def apply_plan(
5050
no_gaps: bool = False,
5151
restatements: t.Optional[t.Iterable[str]] = None,
5252
notification_targets: t.Optional[t.List[NotificationTarget]] = None,
53+
backfill_concurrent_tasks: int = 1,
5354
ddl_concurrent_tasks: int = 1,
5455
timestamp: t.Optional[datetime] = None,
5556
) -> str:
@@ -63,6 +64,7 @@ def apply_plan(
6364
request_id=request_id,
6465
restatements=set(restatements or []),
6566
notification_targets=notification_targets or [],
67+
backfill_concurrent_tasks=backfill_concurrent_tasks,
6668
ddl_concurrent_tasks=ddl_concurrent_tasks,
6769
),
6870
dag_run_id=common.INIT_RUN_ID if is_first_run else None,

sqlmesh/schedulers/airflow/common.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class PlanReceiverDagConf(PydanticModel):
4747
no_gaps: bool
4848
restatements: t.Set[str]
4949
notification_targets: t.List[NotificationTarget]
50+
backfill_concurrent_tasks: int
5051
ddl_concurrent_tasks: int
5152

5253

@@ -68,6 +69,7 @@ class PlanApplicationRequest(PydanticModel):
6869
plan_id: str
6970
previous_plan_id: t.Optional[str]
7071
notification_targets: t.List[NotificationTarget]
72+
backfill_concurrent_tasks: int
7173
ddl_concurrent_tasks: int
7274

7375

sqlmesh/schedulers/airflow/dag_generator.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ def _create_plan_application_dag(
129129
dag_id=dag_id,
130130
schedule_interval="@once",
131131
start_date=now(),
132+
max_active_tasks=request.backfill_concurrent_tasks,
132133
catchup=False,
133134
is_paused_upon_creation=False,
134135
tags=[

sqlmesh/schedulers/airflow/integration.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ def dags(self) -> t.List[DAG]:
112112
def _create_plan_receiver_dag(self) -> DAG:
113113
dag = self._create_system_dag(common.PLAN_RECEIVER_DAG_ID, None)
114114

115-
receiver_task = PythonOperator(
115+
PythonOperator(
116116
task_id=common.PLAN_RECEIVER_TASK_ID,
117117
python_callable=_plan_receiver_task,
118118
dag=dag,
@@ -234,6 +234,7 @@ def _plan_receiver_task(
234234
plan_id=plan_conf.environment.plan_id,
235235
previous_plan_id=plan_conf.environment.previous_plan_id,
236236
notification_targets=plan_conf.notification_targets,
237+
backfill_concurrent_tasks=plan_conf.backfill_concurrent_tasks,
237238
ddl_concurrent_tasks=plan_conf.ddl_concurrent_tasks,
238239
)
239240

0 commit comments

Comments
 (0)