-
Notifications
You must be signed in to change notification settings - Fork 2.2k
fix #7502: write run_results.json for run operation #7655
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
a500e37
17d5e0d
252d4bc
8ce74ff
99c4523
f50328f
ac5e0c0
c3163d8
c38d618
e9cfe93
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| kind: Fixes | ||
| body: write run_results.json for run operation | ||
| time: 2023-05-22T13:29:24.182612-07:00 | ||
| custom: | ||
| Author: aranke | ||
| Issue: "7502" |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,19 +1,25 @@ | ||||||
| from datetime import datetime | ||||||
| import os | ||||||
| import threading | ||||||
| import traceback | ||||||
| from datetime import datetime | ||||||
|
|
||||||
| import agate | ||||||
|
|
||||||
| from .base import ConfiguredTask | ||||||
|
|
||||||
| import dbt.exceptions | ||||||
| from dbt.adapters.factory import get_adapter | ||||||
| from dbt.contracts.results import RunOperationResultsArtifact | ||||||
| from dbt.contracts.files import FileHash | ||||||
| from dbt.contracts.graph.nodes import HookNode | ||||||
| from dbt.contracts.results import RunResultsArtifact, RunResult, RunStatus, TimingInfo | ||||||
| from dbt.events.functions import fire_event | ||||||
| from dbt.events.types import ( | ||||||
| RunningOperationCaughtError, | ||||||
| RunningOperationUncaughtError, | ||||||
| LogDebugStackTrace, | ||||||
| ) | ||||||
| from dbt.node_types import NodeType | ||||||
| from .base import ConfiguredTask | ||||||
|
|
||||||
| RESULT_FILE_NAME = "run_results.json" | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am assuming you don't need to define the name here? the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope, they're defined inline: dbt-core/core/dbt/task/runnable.py Line 58 in dffbb6a
dbt-core/core/dbt/task/freshness.py Line 30 in dffbb6a
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we should put those constants in base task file or somewhere and import them in other tasks so the string is only being defined once. I will leave it to you to either do it here, a follow up PR, or not do anything.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll leave it as-is for now. |
||||||
|
|
||||||
|
|
||||||
| class RunOperationTask(ConfiguredTask): | ||||||
|
|
@@ -22,10 +28,13 @@ def _get_macro_parts(self): | |||||
| if "." in macro_name: | ||||||
| package_name, macro_name = macro_name.split(".", 1) | ||||||
| else: | ||||||
| package_name = None | ||||||
| package_name = self.config.project_name | ||||||
|
|
||||||
| return package_name, macro_name | ||||||
|
|
||||||
| def result_path(self): | ||||||
aranke marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
| return os.path.join(self.config.target_path, RESULT_FILE_NAME) | ||||||
|
|
||||||
| def _run_unsafe(self) -> agate.Table: | ||||||
| adapter = get_adapter(self.config) | ||||||
|
|
||||||
|
|
@@ -40,7 +49,7 @@ def _run_unsafe(self) -> agate.Table: | |||||
|
|
||||||
| return res | ||||||
|
|
||||||
| def run(self) -> RunOperationResultsArtifact: | ||||||
| def run(self) -> RunResultsArtifact: | ||||||
| start = datetime.utcnow() | ||||||
| self.compile_manifest() | ||||||
| try: | ||||||
|
|
@@ -56,11 +65,46 @@ def run(self) -> RunOperationResultsArtifact: | |||||
| else: | ||||||
| success = True | ||||||
| end = datetime.utcnow() | ||||||
| return RunOperationResultsArtifact.from_success( | ||||||
|
|
||||||
| package_name, macro_name = self._get_macro_parts() | ||||||
| fqn = [NodeType.Operation, package_name, macro_name] | ||||||
| unique_id = ".".join(fqn) | ||||||
|
|
||||||
| run_result = RunResult( | ||||||
| adapter_response={}, | ||||||
| status=RunStatus.Success if success else RunStatus.Error, | ||||||
| execution_time=(end - start).total_seconds(), | ||||||
| failures=0 if success else 1, | ||||||
| message=None, | ||||||
| node=HookNode( | ||||||
| alias=macro_name, | ||||||
| checksum=FileHash.from_contents(unique_id), | ||||||
| database=self.config.credentials.database, | ||||||
| schema=self.config.credentials.schema, | ||||||
| resource_type=NodeType.Operation, | ||||||
| fqn=fqn, | ||||||
| name=macro_name, | ||||||
| unique_id=unique_id, | ||||||
| package_name=package_name, | ||||||
| path="", | ||||||
| original_file_path="", | ||||||
| ), | ||||||
| thread_id=threading.current_thread().name, | ||||||
| timing=[TimingInfo(name=macro_name, started_at=start, completed_at=end)], | ||||||
| ) | ||||||
|
|
||||||
| results = RunResultsArtifact.from_execution_results( | ||||||
| generated_at=end, | ||||||
| elapsed_time=(end - start).total_seconds(), | ||||||
| success=success, | ||||||
| args={ | ||||||
| k: v | ||||||
| for k, v in self.args.__dict__.items() | ||||||
| if k.islower() and type(v) in (str, int, float, bool, list, dict) | ||||||
| }, | ||||||
| results=[run_result], | ||||||
| ) | ||||||
| results.write(self.result_path()) | ||||||
aranke marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
| return results | ||||||
|
|
||||||
| def interpret_results(self, results): | ||||||
| return results.success | ||||||
| return results.results[0].status == RunStatus.Success | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,7 @@ | |
| import dbt | ||
| import jsonschema | ||
|
|
||
| from dbt.tests.util import run_dbt, get_artifact, check_datetime_between | ||
| from dbt.tests.util import run_dbt, get_artifact, check_datetime_between, run_dbt_and_capture | ||
| from tests.functional.artifacts.expected_manifest import ( | ||
| expected_seeded_manifest, | ||
| expected_references_manifest, | ||
|
|
@@ -17,7 +17,7 @@ | |
| ) | ||
|
|
||
| from dbt.contracts.graph.manifest import WritableManifest | ||
| from dbt.contracts.results import RunResultsArtifact | ||
| from dbt.contracts.results import RunResultsArtifact, RunStatus | ||
|
|
||
| models__schema_yml = """ | ||
| version: 2 | ||
|
|
@@ -129,6 +129,17 @@ | |
| select * from {{ ref('seed') }} | ||
| """ | ||
|
|
||
| models__model_with_pre_hook_sql = """ | ||
| {{ | ||
| config( | ||
| pre_hook={ | ||
| "sql": "{{ alter_timezone(timezone='Etc/UTC') }}" | ||
| } | ||
| ) | ||
| }} | ||
| select current_setting('timezone') as timezone | ||
| """ | ||
|
|
||
| seed__schema_yml = """ | ||
| version: 2 | ||
| seeds: | ||
|
|
@@ -184,6 +195,17 @@ | |
| {% endtest %} | ||
| """ | ||
|
|
||
| macros__alter_timezone_sql = """ | ||
| {% macro alter_timezone(timezone='America/Los_Angeles') %} | ||
| {% set sql %} | ||
| SET TimeZone='{{ timezone }}'; | ||
| {% endset %} | ||
|
|
||
| {% do run_query(sql) %} | ||
| {% do log("Timezone set to: " + timezone, info=True) %} | ||
| {% endmacro %} | ||
| """ | ||
|
|
||
| snapshot__snapshot_seed_sql = """ | ||
| {% snapshot snapshot_seed %} | ||
| {{ | ||
|
|
@@ -328,7 +350,6 @@ | |
|
|
||
| """ | ||
|
|
||
|
|
||
| versioned_models__schema_yml = """ | ||
| version: 2 | ||
|
|
||
|
|
@@ -508,7 +529,7 @@ def verify_run_results(project, expected_run_results, start_time, run_results_sc | |
| # sort the results so we can make reasonable assertions | ||
| run_results["results"].sort(key=lambda r: r["unique_id"]) | ||
| assert run_results["results"] == expected_run_results | ||
| set(run_results) == {"elapsed_time", "results", "metadata"} | ||
| assert set(run_results) == {"elapsed_time", "results", "metadata", "args"} | ||
|
|
||
|
|
||
| class BaseVerifyProject: | ||
|
|
@@ -649,3 +670,28 @@ def test_versions(self, project, manifest_schema_path, run_results_schema_path): | |
| verify_run_results( | ||
| project, expected_versions_run_results(), start_time, run_results_schema_path | ||
| ) | ||
|
|
||
|
|
||
| class TestVerifyRunOperation(BaseVerifyProject): | ||
| @pytest.fixture(scope="class") | ||
| def macros(self): | ||
| return {"alter_timezone.sql": macros__alter_timezone_sql} | ||
|
|
||
| @pytest.fixture(scope="class") | ||
| def models(self): | ||
| return { | ||
| "model_with_pre_hook.sql": models__model_with_pre_hook_sql, | ||
| } | ||
|
|
||
| def test_run_operation(self, project): | ||
| results, log_output = run_dbt_and_capture(["run-operation", "alter_timezone"]) | ||
| assert len(results) == 1 | ||
| assert results[0].status == RunStatus.Success | ||
| assert results[0].unique_id == "operation.test.alter_timezone" | ||
| assert "Timezone set to: America/Los_Angeles" in log_output | ||
|
|
||
| def test_run_model(self, project): | ||
|
||
| results, log_output = run_dbt_and_capture(["run", "--select", "model_with_pre_hook"]) | ||
| assert len(results) == 1 | ||
| assert results[0].status == RunStatus.Success | ||
| assert "Timezone set to: Etc/UTC" in log_output | ||
Uh oh!
There was an error while loading. Please reload this page.