From 5f564167755b724b8e55cbdde7bfaa538cc06918 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Mon, 8 May 2023 15:08:24 -0700 Subject: [PATCH 01/11] cherry pick the right stuff --- .../unreleased/Fixes-20230508-044922.yaml | 6 +++ core/dbt/task/show.py | 9 +++- tests/functional/show/fixtures.py | 37 +++++++++++++ tests/functional/show/test_show.py | 52 +++++++++++++++++++ 4 files changed, 102 insertions(+), 2 deletions(-) create mode 100644 .changes/unreleased/Fixes-20230508-044922.yaml diff --git a/.changes/unreleased/Fixes-20230508-044922.yaml b/.changes/unreleased/Fixes-20230508-044922.yaml new file mode 100644 index 00000000000..9847c2224b1 --- /dev/null +++ b/.changes/unreleased/Fixes-20230508-044922.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: enable dbt show for seeds +time: 2023-05-08T04:49:22.82093-07:00 +custom: + Author: aranke + Issue: "7273" diff --git a/core/dbt/task/show.py b/core/dbt/task/show.py index 58e0948bcfb..4e02fc76cea 100644 --- a/core/dbt/task/show.py +++ b/core/dbt/task/show.py @@ -2,12 +2,14 @@ import threading import time +from dbt.contracts.graph.nodes import SeedNode from dbt.contracts.results import RunResult, RunStatus from dbt.events.base_types import EventLevel from dbt.events.functions import fire_event from dbt.events.types import ShowNode, Note from dbt.exceptions import DbtRuntimeError from dbt.task.compile import CompileTask, CompileRunner +from dbt.task.seed import SeedRunner class ShowRunner(CompileRunner): @@ -41,8 +43,11 @@ def _runtime_initialize(self): raise DbtRuntimeError("Either --select or --inline must be passed to show") super()._runtime_initialize() - def get_runner_type(self, _): - return ShowRunner + def get_runner_type(self, node): + if isinstance(node, SeedNode): + return SeedRunner + else: + return ShowRunner def task_end_messages(self, results): is_inline = bool(getattr(self.args, "inline", None)) diff --git a/tests/functional/show/fixtures.py b/tests/functional/show/fixtures.py index 6fa89b32893..d3d8c57e96c 100644 --- a/tests/functional/show/fixtures.py +++ b/tests/functional/show/fixtures.py @@ -10,6 +10,43 @@ from {{ ref('sample_model') }} """ +models__sql_header = """ +{% call set_sql_header(config) %} +set session time zone 'Asia/Kolkata'; +{%- endcall %} +select current_setting('timezone') as timezone +""" + + +schema_yml = """ +models: + - name: sample_model + latest_version: 1 + + # declare the versions, and fully specify them + versions: + - v: 2 + config: + materialized: table + columns: + - name: sample_num + data_type: int + - name: sample_bool + data_type: bool + - name: answer + data_type: int + + - v: 1 + config: + materialized: table + contract: {enforced: true} + columns: + - name: sample_num + data_type: int + - name: sample_bool + data_type: bool +""" + models__ephemeral_model = """ {{ config(materialized = 'ephemeral') }} select diff --git a/tests/functional/show/test_show.py b/tests/functional/show/test_show.py index c5684197ec5..69686ebb09b 100644 --- a/tests/functional/show/test_show.py +++ b/tests/functional/show/test_show.py @@ -8,6 +8,8 @@ models__sample_model, models__second_model, models__ephemeral_model, + schema_yml, + models__sql_header, ) @@ -18,6 +20,7 @@ def models(self): "sample_model.sql": models__sample_model, "second_model.sql": models__second_model, "ephemeral_model.sql": models__ephemeral_model, + "sql_header.sql": models__sql_header, } @pytest.fixture(scope="class") @@ -85,3 +88,52 @@ def test_second_ephemeral_model(self, project): ["show", "--inline", models__second_ephemeral_model] ) assert "col_hundo" in log_output + + @pytest.mark.parametrize( + "args,expected", + [ + ([], 5), # default limit + (["--limit", 3], 3), # fetch 3 rows + (["--limit", -1], 7), # fetch all rows + ], + ) + def test_limit(self, project, args, expected): + run_dbt(["build"]) + dbt_args = ["show", "--inline", models__second_ephemeral_model, *args] + results, log_output = run_dbt_and_capture(dbt_args) + assert len(results.results[0].agate_table) == expected + + def test_seed(self, project): + (results, log_output) = run_dbt_and_capture(["show", "--select", "sample_seed"]) + assert "Previewing node 'sample_seed'" in log_output + + def test_sql_header(self, project): + run_dbt(["build"]) + (results, log_output) = run_dbt_and_capture(["show", "--select", "sql_header"]) + assert "Asia/Kolkata" in log_output + + +class TestShowModelVersions: + @pytest.fixture(scope="class") + def models(self): + return { + "schema.yml": schema_yml, + "sample_model.sql": models__sample_model, + "sample_model_v2.sql": models__second_model, + } + + @pytest.fixture(scope="class") + def seeds(self): + return {"sample_seed.csv": seeds__sample_seed} + + def test_version_unspecified(self, project): + run_dbt(["build"]) + (results, log_output) = run_dbt_and_capture(["show", "--select", "sample_model"]) + assert "Previewing node 'sample_model.v1'" in log_output + assert "Previewing node 'sample_model.v2'" in log_output + + def test_none(self, project): + run_dbt(["build"]) + (results, log_output) = run_dbt_and_capture(["show", "--select", "sample_model.v2"]) + assert "Previewing node 'sample_model.v1'" not in log_output + assert "Previewing node 'sample_model.v2'" in log_output From 685e06fd9ffbb8b9a93732f083bb84f4c6738be0 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Tue, 9 May 2023 14:44:35 -0700 Subject: [PATCH 02/11] Update show.py --- core/dbt/task/show.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/core/dbt/task/show.py b/core/dbt/task/show.py index 4e02fc76cea..f9af847e874 100644 --- a/core/dbt/task/show.py +++ b/core/dbt/task/show.py @@ -19,8 +19,17 @@ def __init__(self, config, adapter, node, node_index, num_nodes): def execute(self, compiled_node, manifest): start_time = time.time() + + # Allow passing in -1 (or any negative number) to get all rows + limit = None if self.config.args.limit < 0 else self.config.args.limit + + if "sql_header" in compiled_node.unrendered_config: + compiled_node.compiled_code = ( + compiled_node.unrendered_config["sql_header"] + compiled_node.compiled_code + ) + adapter_response, execute_result = self.adapter.execute( - compiled_node.compiled_code, fetch=True + compiled_node.compiled_code, fetch=True, limit=limit ) end_time = time.time() @@ -66,13 +75,8 @@ def task_end_messages(self, results): ) for result in matched_results: - # Allow passing in -1 (or any negative number) to get all rows table = result.agate_table - if self.args.limit >= 0: - table = table.limit(self.args.limit) - result.agate_table = table - # Hack to get Agate table output as string output = io.StringIO() if self.args.output == "json": @@ -80,9 +84,14 @@ def task_end_messages(self, results): else: table.print_table(output=output, max_rows=None) + node_name = result.node.name + + if hasattr(result.node, "version") and result.node.version: + node_name += f".v{result.node.version}" + fire_event( ShowNode( - node_name=result.node.name, + node_name=node_name, preview=output.getvalue(), is_inline=is_inline, output_format=self.args.output, From 89ecbb91eaaba59c52e163cc2bc56c4226219584 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Tue, 9 May 2023 14:50:37 -0700 Subject: [PATCH 03/11] update impl.py --- core/dbt/adapters/base/impl.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index a150f8296b8..7bb16c7ea4e 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -274,7 +274,7 @@ def connection_for(self, node: ResultNode) -> Iterator[None]: @available.parse(lambda *a, **k: ("", empty_table())) def execute( - self, sql: str, auto_begin: bool = False, fetch: bool = False + self, sql: str, auto_begin: bool = False, fetch: bool = False, limit: Optional[int] = None ) -> Tuple[AdapterResponse, agate.Table]: """Execute the given SQL. This is a thin wrapper around ConnectionManager.execute. @@ -283,10 +283,11 @@ def execute( :param bool auto_begin: If set, and dbt is not currently inside a transaction, automatically begin one. :param bool fetch: If set, fetch results. + :param Optional[int] limit: If set, only fetch n number of rows :return: A tuple of the query status and results (empty if fetch=False). :rtype: Tuple[AdapterResponse, agate.Table] """ - return self.connections.execute(sql=sql, auto_begin=auto_begin, fetch=fetch) + return self.connections.execute(sql=sql, auto_begin=auto_begin, fetch=fetch, limit=limit) @available.parse(lambda *a, **k: []) def get_column_schema_from_query(self, sql: str) -> List[BaseColumn]: From 17e3d0d172b53b47c18f2802e637ec1e574d830e Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Tue, 9 May 2023 15:01:17 -0700 Subject: [PATCH 04/11] update connections.py --- core/dbt/adapters/sql/connections.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/dbt/adapters/sql/connections.py b/core/dbt/adapters/sql/connections.py index 88e4a30d0b6..464c07871a0 100644 --- a/core/dbt/adapters/sql/connections.py +++ b/core/dbt/adapters/sql/connections.py @@ -118,13 +118,16 @@ def process_results( return [dict(zip(column_names, row)) for row in rows] @classmethod - def get_result_from_cursor(cls, cursor: Any) -> agate.Table: + def get_result_from_cursor(cls, cursor: Any, limit: Optional[int]) -> agate.Table: data: List[Any] = [] column_names: List[str] = [] if cursor.description is not None: column_names = [col[0] for col in cursor.description] - rows = cursor.fetchall() + if limit: + rows = cursor.fetchmany(limit) + else: + rows = cursor.fetchall() data = cls.process_results(column_names, rows) return dbt.clients.agate_helper.table_from_data_flat(data, column_names) @@ -138,13 +141,13 @@ def data_type_code_to_name(cls, type_code: Union[int, str]) -> str: ) def execute( - self, sql: str, auto_begin: bool = False, fetch: bool = False + self, sql: str, auto_begin: bool = False, fetch: bool = False, limit: Optional[int] = None ) -> Tuple[AdapterResponse, agate.Table]: sql = self._add_query_comment(sql) _, cursor = self.add_query(sql, auto_begin) response = self.get_response(cursor) if fetch: - table = self.get_result_from_cursor(cursor) + table = self.get_result_from_cursor(cursor, limit) else: table = dbt.clients.agate_helper.empty_table() return response, table From 0ac734ec3d580538fc5b9d1439b29fe76b9e14f9 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Tue, 9 May 2023 15:05:01 -0700 Subject: [PATCH 05/11] Delete Fixes-20230508-044922.yaml --- .changes/unreleased/Fixes-20230508-044922.yaml | 6 ------ 1 file changed, 6 deletions(-) delete mode 100644 .changes/unreleased/Fixes-20230508-044922.yaml diff --git a/.changes/unreleased/Fixes-20230508-044922.yaml b/.changes/unreleased/Fixes-20230508-044922.yaml deleted file mode 100644 index 9847c2224b1..00000000000 --- a/.changes/unreleased/Fixes-20230508-044922.yaml +++ /dev/null @@ -1,6 +0,0 @@ -kind: Fixes -body: enable dbt show for seeds -time: 2023-05-08T04:49:22.82093-07:00 -custom: - Author: aranke - Issue: "7273" From b8a8060f6a3e8d38050d7b90ab81afbdee222fbb Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 11 May 2023 08:52:28 -0700 Subject: [PATCH 06/11] Restore core/dbt/adapters/base/impl.py --- core/dbt/adapters/base/impl.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 7bb16c7ea4e..a150f8296b8 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -274,7 +274,7 @@ def connection_for(self, node: ResultNode) -> Iterator[None]: @available.parse(lambda *a, **k: ("", empty_table())) def execute( - self, sql: str, auto_begin: bool = False, fetch: bool = False, limit: Optional[int] = None + self, sql: str, auto_begin: bool = False, fetch: bool = False ) -> Tuple[AdapterResponse, agate.Table]: """Execute the given SQL. This is a thin wrapper around ConnectionManager.execute. @@ -283,11 +283,10 @@ def execute( :param bool auto_begin: If set, and dbt is not currently inside a transaction, automatically begin one. :param bool fetch: If set, fetch results. - :param Optional[int] limit: If set, only fetch n number of rows :return: A tuple of the query status and results (empty if fetch=False). :rtype: Tuple[AdapterResponse, agate.Table] """ - return self.connections.execute(sql=sql, auto_begin=auto_begin, fetch=fetch, limit=limit) + return self.connections.execute(sql=sql, auto_begin=auto_begin, fetch=fetch) @available.parse(lambda *a, **k: []) def get_column_schema_from_query(self, sql: str) -> List[BaseColumn]: From e548886de1d84cb78c05c900b3f31599fb1147e0 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 11 May 2023 08:52:41 -0700 Subject: [PATCH 07/11] Restore core/dbt/adapters/sql/connections.py --- core/dbt/adapters/sql/connections.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/core/dbt/adapters/sql/connections.py b/core/dbt/adapters/sql/connections.py index 464c07871a0..88e4a30d0b6 100644 --- a/core/dbt/adapters/sql/connections.py +++ b/core/dbt/adapters/sql/connections.py @@ -118,16 +118,13 @@ def process_results( return [dict(zip(column_names, row)) for row in rows] @classmethod - def get_result_from_cursor(cls, cursor: Any, limit: Optional[int]) -> agate.Table: + def get_result_from_cursor(cls, cursor: Any) -> agate.Table: data: List[Any] = [] column_names: List[str] = [] if cursor.description is not None: column_names = [col[0] for col in cursor.description] - if limit: - rows = cursor.fetchmany(limit) - else: - rows = cursor.fetchall() + rows = cursor.fetchall() data = cls.process_results(column_names, rows) return dbt.clients.agate_helper.table_from_data_flat(data, column_names) @@ -141,13 +138,13 @@ def data_type_code_to_name(cls, type_code: Union[int, str]) -> str: ) def execute( - self, sql: str, auto_begin: bool = False, fetch: bool = False, limit: Optional[int] = None + self, sql: str, auto_begin: bool = False, fetch: bool = False ) -> Tuple[AdapterResponse, agate.Table]: sql = self._add_query_comment(sql) _, cursor = self.add_query(sql, auto_begin) response = self.get_response(cursor) if fetch: - table = self.get_result_from_cursor(cursor, limit) + table = self.get_result_from_cursor(cursor) else: table = dbt.clients.agate_helper.empty_table() return response, table From 7d42c7888273f7b26d2b3ebdca9e7a3a0fe715af Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 11 May 2023 08:54:31 -0700 Subject: [PATCH 08/11] Update test_show.py --- tests/functional/show/test_show.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/tests/functional/show/test_show.py b/tests/functional/show/test_show.py index 69686ebb09b..d2033f9b82a 100644 --- a/tests/functional/show/test_show.py +++ b/tests/functional/show/test_show.py @@ -89,20 +89,6 @@ def test_second_ephemeral_model(self, project): ) assert "col_hundo" in log_output - @pytest.mark.parametrize( - "args,expected", - [ - ([], 5), # default limit - (["--limit", 3], 3), # fetch 3 rows - (["--limit", -1], 7), # fetch all rows - ], - ) - def test_limit(self, project, args, expected): - run_dbt(["build"]) - dbt_args = ["show", "--inline", models__second_ephemeral_model, *args] - results, log_output = run_dbt_and_capture(dbt_args) - assert len(results.results[0].agate_table) == expected - def test_seed(self, project): (results, log_output) = run_dbt_and_capture(["show", "--select", "sample_seed"]) assert "Previewing node 'sample_seed'" in log_output From ac9801baaf723ef494f1df7131fdd01713fd890b Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 11 May 2023 08:58:05 -0700 Subject: [PATCH 09/11] Restore core/dbt/task/show.py --- core/dbt/task/show.py | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/core/dbt/task/show.py b/core/dbt/task/show.py index f9af847e874..1b84b85e093 100644 --- a/core/dbt/task/show.py +++ b/core/dbt/task/show.py @@ -2,14 +2,12 @@ import threading import time -from dbt.contracts.graph.nodes import SeedNode from dbt.contracts.results import RunResult, RunStatus from dbt.events.base_types import EventLevel from dbt.events.functions import fire_event from dbt.events.types import ShowNode, Note from dbt.exceptions import DbtRuntimeError from dbt.task.compile import CompileTask, CompileRunner -from dbt.task.seed import SeedRunner class ShowRunner(CompileRunner): @@ -19,17 +17,8 @@ def __init__(self, config, adapter, node, node_index, num_nodes): def execute(self, compiled_node, manifest): start_time = time.time() - - # Allow passing in -1 (or any negative number) to get all rows - limit = None if self.config.args.limit < 0 else self.config.args.limit - - if "sql_header" in compiled_node.unrendered_config: - compiled_node.compiled_code = ( - compiled_node.unrendered_config["sql_header"] + compiled_node.compiled_code - ) - adapter_response, execute_result = self.adapter.execute( - compiled_node.compiled_code, fetch=True, limit=limit + compiled_node.compiled_code, fetch=True ) end_time = time.time() @@ -52,11 +41,8 @@ def _runtime_initialize(self): raise DbtRuntimeError("Either --select or --inline must be passed to show") super()._runtime_initialize() - def get_runner_type(self, node): - if isinstance(node, SeedNode): - return SeedRunner - else: - return ShowRunner + def get_runner_type(self, _): + return ShowRunner def task_end_messages(self, results): is_inline = bool(getattr(self.args, "inline", None)) @@ -75,8 +61,13 @@ def task_end_messages(self, results): ) for result in matched_results: + # Allow passing in -1 (or any negative number) to get all rows table = result.agate_table + if self.args.limit >= 0: + table = table.limit(self.args.limit) + result.agate_table = table + # Hack to get Agate table output as string output = io.StringIO() if self.args.output == "json": From 51a99e02b253d16a3badd89f2e7c3381acee09f4 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 11 May 2023 09:00:52 -0700 Subject: [PATCH 10/11] revert show.py --- core/dbt/task/show.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/dbt/task/show.py b/core/dbt/task/show.py index 1b84b85e093..0ea7160bfa7 100644 --- a/core/dbt/task/show.py +++ b/core/dbt/task/show.py @@ -2,12 +2,14 @@ import threading import time +from dbt.contracts.graph.nodes import SeedNode from dbt.contracts.results import RunResult, RunStatus from dbt.events.base_types import EventLevel from dbt.events.functions import fire_event from dbt.events.types import ShowNode, Note from dbt.exceptions import DbtRuntimeError from dbt.task.compile import CompileTask, CompileRunner +from dbt.task.seed import SeedRunner class ShowRunner(CompileRunner): @@ -41,8 +43,11 @@ def _runtime_initialize(self): raise DbtRuntimeError("Either --select or --inline must be passed to show") super()._runtime_initialize() - def get_runner_type(self, _): - return ShowRunner + def get_runner_type(self, node): + if isinstance(node, SeedNode): + return SeedRunner + else: + return ShowRunner def task_end_messages(self, results): is_inline = bool(getattr(self.args, "inline", None)) From 24531889516d2be11ece9d839dae62d942222f78 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 11 May 2023 09:03:07 -0700 Subject: [PATCH 11/11] add back sql header --- core/dbt/task/show.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/dbt/task/show.py b/core/dbt/task/show.py index 0ea7160bfa7..19681b3a0c3 100644 --- a/core/dbt/task/show.py +++ b/core/dbt/task/show.py @@ -19,6 +19,12 @@ def __init__(self, config, adapter, node, node_index, num_nodes): def execute(self, compiled_node, manifest): start_time = time.time() + + if "sql_header" in compiled_node.unrendered_config: + compiled_node.compiled_code = ( + compiled_node.unrendered_config["sql_header"] + compiled_node.compiled_code + ) + adapter_response, execute_result = self.adapter.execute( compiled_node.compiled_code, fetch=True )