-
Notifications
You must be signed in to change notification settings - Fork 380
Expand file tree
/
Copy pathmeta.py
More file actions
35 lines (26 loc) · 1.01 KB
/
meta.py
File metadata and controls
35 lines (26 loc) · 1.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from __future__ import annotations
from fastapi import APIRouter, Depends, Request
from sqlmesh.cli.main import _sqlmesh_version
from web.server import models
from web.server.console import api_console
from web.server.settings import Settings, get_settings
router = APIRouter()
@router.get(
"",
response_model=models.Meta,
response_model_exclude_unset=True,
)
def get_api_meta(
request: Request,
settings: Settings = Depends(get_settings),
) -> models.Meta:
"""Get the metadata"""
has_running_task = False
if models.Modules.PLANS in settings.modules:
has_running_task = hasattr(request.app.state, "task") and not request.app.state.task.done()
api_console.log_event_plan_overview()
api_console.log_event_plan_apply()
api_console.log_event_plan_cancel()
if not has_running_task and api_console.is_cancelling_plan():
api_console.finish_plan_cancellation()
return models.Meta(version=_sqlmesh_version(), has_running_task=has_running_task)