-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Fixes #7785: fail-fast behavior #8066
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
e8018f7
7b71ea0
4d4ba6c
86b030c
b295bf5
1813c03
56bcfd2
60f1cc9
5ab8438
717bf31
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,29 +1,22 @@ | ||
| import os | ||
| import time | ||
| from pathlib import Path | ||
| from abc import abstractmethod | ||
| from concurrent.futures import as_completed | ||
| from datetime import datetime | ||
| from multiprocessing.dummy import Pool as ThreadPool | ||
| from pathlib import Path | ||
| from typing import Optional, Dict, List, Set, Tuple, Iterable, AbstractSet | ||
|
|
||
| from .printer import ( | ||
| print_run_result_error, | ||
| print_run_end_messages, | ||
| ) | ||
|
|
||
| from dbt.task.base import ConfiguredTask | ||
| import dbt.exceptions | ||
| import dbt.tracking | ||
| import dbt.utils | ||
| from dbt.adapters.base import BaseRelation | ||
| from dbt.adapters.factory import get_adapter | ||
| from dbt.logger import ( | ||
| DbtProcessState, | ||
| TextOnly, | ||
| UniqueID, | ||
| TimestampNamed, | ||
| DbtModelState, | ||
| ModelMetadata, | ||
| NodeCount, | ||
| ) | ||
| from dbt.contracts.graph.manifest import WritableManifest | ||
| from dbt.contracts.graph.nodes import ResultNode | ||
| from dbt.contracts.results import NodeStatus, RunExecutionResult, RunningStatus | ||
| from dbt.contracts.state import PreviousState | ||
| from dbt.events.contextvars import log_contextvars, task_contextvars | ||
| from dbt.events.functions import fire_event, warn_or_error | ||
| from dbt.events.types import ( | ||
| Formatting, | ||
|
|
@@ -36,25 +29,29 @@ | |
| EndRunResult, | ||
| NothingToDo, | ||
| ) | ||
| from dbt.events.contextvars import log_contextvars, task_contextvars | ||
| from dbt.contracts.graph.nodes import ResultNode | ||
| from dbt.contracts.results import NodeStatus, RunExecutionResult, RunningStatus | ||
| from dbt.contracts.state import PreviousState | ||
| from dbt.exceptions import ( | ||
| DbtInternalError, | ||
| NotImplementedError, | ||
| DbtRuntimeError, | ||
| FailFastError, | ||
| ) | ||
|
|
||
| from dbt.flags import get_flags | ||
| from dbt.graph import GraphQueue, NodeSelector, SelectionSpec, parse_difference | ||
| from dbt.logger import ( | ||
| DbtProcessState, | ||
| TextOnly, | ||
| UniqueID, | ||
| TimestampNamed, | ||
| DbtModelState, | ||
| ModelMetadata, | ||
| NodeCount, | ||
| ) | ||
| from dbt.parser.manifest import write_manifest | ||
| import dbt.tracking | ||
|
|
||
| import dbt.exceptions | ||
| from dbt.flags import get_flags | ||
| import dbt.utils | ||
| from dbt.contracts.graph.manifest import WritableManifest | ||
| from dbt.task.base import ConfiguredTask, skip_result | ||
| from .printer import ( | ||
| print_run_result_error, | ||
| print_run_end_messages, | ||
| ) | ||
|
|
||
| RESULT_FILE_NAME = "run_results.json" | ||
| RUNNING_STATE = DbtProcessState("running") | ||
|
|
@@ -360,21 +357,25 @@ def execute_nodes(self): | |
| pool = ThreadPool(num_threads) | ||
| try: | ||
| self.run_queue(pool) | ||
|
|
||
| except FailFastError as failure: | ||
| self._cancel_connections(pool) | ||
|
|
||
| node_results_ids = [r.node.unique_id for r in self.node_results] | ||
aranke marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| for r in self._flattened_nodes: | ||
| if r.unique_id not in node_results_ids: | ||
| self.node_results.append(skip_result(r, "Skipping due to fail_fast")) | ||
|
|
||
| print_run_result_error(failure.result) | ||
| raise | ||
|
|
||
| except KeyboardInterrupt: | ||
| self._cancel_connections(pool) | ||
| print_run_end_messages(self.node_results, keyboard_interrupt=True) | ||
| raise | ||
|
|
||
| pool.close() | ||
| pool.join() | ||
|
|
||
| return self.node_results | ||
| finally: | ||
| pool.close() | ||
| pool.join() | ||
| return self.node_results | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Out of curiosity, why return is also moved into the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe |
||
|
|
||
| def _mark_dependent_errors(self, node_id, result, cause): | ||
| if self.graph is None: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -145,22 +145,6 @@ def test_run_operation(self, project): | |
| results = run_dbt(["retry"], expect_pass=False) | ||
| assert {n.unique_id: n.status for n in results.results} == expected_statuses | ||
|
|
||
| def test_fail_fast(self, project): | ||
| result = run_dbt(["--warn-error", "build", "--fail-fast"], expect_pass=False) | ||
|
|
||
| assert result.status == RunStatus.Error | ||
| assert result.node.name == "sample_model" | ||
|
|
||
| results = run_dbt(["retry"], expect_pass=False) | ||
|
|
||
| assert len(results.results) == 1 | ||
| assert results.results[0].status == RunStatus.Error | ||
| assert results.results[0].node.name == "sample_model" | ||
|
|
||
| result = run_dbt(["retry", "--fail-fast"], expect_pass=False) | ||
| assert result.status == RunStatus.Error | ||
| assert result.node.name == "sample_model" | ||
|
|
||
| def test_removed_file(self, project): | ||
| run_dbt(["build"], expect_pass=False) | ||
|
|
||
|
|
@@ -180,3 +164,57 @@ def test_removed_file_leaf_node(self, project): | |
| rm_file("models", "third_model.sql") | ||
| with pytest.raises(ValueError, match="Couldn't find model 'model.test.third_model'"): | ||
| run_dbt(["retry"], expect_pass=False) | ||
|
|
||
|
|
||
| class TestFailFast: | ||
| @pytest.fixture(scope="class") | ||
| def models(self): | ||
| return { | ||
| "sample_model.sql": models__sample_model, | ||
| "second_model.sql": models__second_model, | ||
| "union_model.sql": models__union_model, | ||
| "final_model.sql": "select * from {{ ref('union_model') }};", | ||
| } | ||
|
|
||
| def test_fail_fast(self, project): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Love the functional test here! |
||
| results = run_dbt(["--fail-fast", "build"], expect_pass=False) | ||
| assert {r.node.unique_id: r.status for r in results.results} == { | ||
| "model.test.sample_model": RunStatus.Error, | ||
| "model.test.second_model": RunStatus.Success, | ||
| "model.test.union_model": RunStatus.Skipped, | ||
| "model.test.final_model": RunStatus.Skipped, | ||
| } | ||
|
|
||
| # Check that retry inherits fail-fast from upstream command (build) | ||
| results = run_dbt(["retry"], expect_pass=False) | ||
| assert {r.node.unique_id: r.status for r in results.results} == { | ||
| "model.test.sample_model": RunStatus.Error, | ||
| "model.test.union_model": RunStatus.Skipped, | ||
| "model.test.final_model": RunStatus.Skipped, | ||
| } | ||
|
|
||
| fixed_sql = "select 1 as id, 1 as foo" | ||
| write_file(fixed_sql, "models", "sample_model.sql") | ||
|
|
||
| results = run_dbt(["retry"], expect_pass=False) | ||
| assert {r.node.unique_id: r.status for r in results.results} == { | ||
| "model.test.sample_model": RunStatus.Success, | ||
| "model.test.union_model": RunStatus.Success, | ||
| "model.test.final_model": RunStatus.Error, | ||
| } | ||
|
|
||
| results = run_dbt(["retry"], expect_pass=False) | ||
| assert {r.node.unique_id: r.status for r in results.results} == { | ||
| "model.test.final_model": RunStatus.Error, | ||
| } | ||
|
|
||
| fixed_sql = "select * from {{ ref('union_model') }}" | ||
| write_file(fixed_sql, "models", "final_model.sql") | ||
|
|
||
| results = run_dbt(["retry"]) | ||
| assert {r.node.unique_id: r.status for r in results.results} == { | ||
| "model.test.final_model": RunStatus.Success, | ||
| } | ||
|
|
||
| results = run_dbt(["retry"]) | ||
| assert {r.node.unique_id: r.status for r in results.results} == {} | ||
Uh oh!
There was an error while loading. Please reload this page.