Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Experiment: column types from empty query
  • Loading branch information
jtcohen6 authored and MichelleArk committed Feb 15, 2023
commit e8399cd39561b648d460d4fc227e7e0a6eb8c612
16 changes: 16 additions & 0 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,22 @@ def execute(
"""
return self.connections.execute(sql=sql, auto_begin=auto_begin, fetch=fetch)

@available.parse(lambda *a, **k: [])
def get_column_schema_from_query(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[AdapterResponse, agate.Table]:
"""Execute the given SQL. This is a thin wrapper around
ConnectionManager.execute.

:param str sql: The sql to execute.
:param bool auto_begin: If set, and dbt is not currently inside a
transaction, automatically begin one.
:param bool fetch: If set, fetch results.
:return: A tuple of the query status and results (empty if fetch=False).
:rtype: List[(column_name: str, data_type: str]
"""
return self.connections.get_column_schema_from_query(sql=sql)

@available.parse(lambda *a, **k: ("", empty_table()))
def get_partitions_metadata(self, table: str) -> Tuple[agate.Table]:
"""Obtain partitions metadata for a BigQuery partitioned table.
Expand Down
39 changes: 39 additions & 0 deletions core/dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,31 @@ def get_result_from_cursor(cls, cursor: Any) -> agate.Table:

return dbt.clients.agate_helper.table_from_data_flat(data, column_names)

@classmethod
def data_type_code_to_name(cls, int) -> str:
"""Get the string representation of the data type from the type_code."""
# https://peps.python.org/pep-0249/#type-objects
raise dbt.exceptions.NotImplementedError(
"`data_type_code_to_name` is not implemented for this adapter!"
)

@classmethod
def get_column_schema_from_cursor(cls, cursor: Any) -> List[Tuple[str, str]]:
# (column_name, data_type)
columns: List[Tuple[str, str]] = []

if cursor.description is not None:
# https://peps.python.org/pep-0249/#description
columns = [
# TODO: ignoring size, precision, scale for now
# (though it is part of DB-API standard, and our Column class does have these attributes)
# IMO user-defined contracts shouldn't have to match an exact size/precision/scale
(col[0], cls.data_type_code_to_name(col[1]))
for col in cursor.description
]

return columns

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[AdapterResponse, agate.Table]:
Expand All @@ -140,6 +165,20 @@ def execute(
table = dbt.clients.agate_helper.empty_table()
return response, table

# TODO: do we need to care about auto_begin here?
def get_column_schema_from_query(self, sql: str) -> List[Tuple[str, str]]:
sql = self._add_query_comment(sql)
_, cursor = self.add_query(sql)
return self.get_column_schema_from_cursor(cursor)

# For dbt-bigquery
# def get_column_schema_from_query(cls, sql: str) -> List[Tuple[str, str]]:
# sql = self._add_query_comment(sql)
# # auto_begin is ignored on bigquery, and only included for consistency
# query_job, iterator = self.raw_execute(sql)
# columns = [(field.name, field.field_type) for field in resp.iterator]
# return columns

def add_begin_query(self):
return self.add_query("BEGIN", auto_begin=False)

Expand Down
38 changes: 31 additions & 7 deletions core/dbt/include/global_project/macros/adapters/columns.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,47 @@
{% endmacro %}


{% macro get_empty_subquery_sql(select_sql) -%}
{{ return(adapter.dispatch('get_empty_subquery_sql', 'dbt')(select_sql)) }}
{% endmacro %}

{% macro default__get_empty_subquery_sql(select_sql) %}
select * from (
{{ select_sql }}
) as __dbt_sbq
where false
limit 0
{% endmacro %}

{% macro get_column_schema_from_query(select_sql) -%}
{{ return(adapter.dispatch('get_column_schema_from_query', 'dbt')(select_sql)) }}
{% endmacro %}

{% macro default__get_column_schema_from_query(select_sql) %}
{% set columns = [] %}
{% set sql = get_empty_subquery_sql(select_sql) %}
{% set column_schema = adapter.get_column_schema_from_query(sql) %}
{% for col in column_schema %}
-- api.Column.create includes a step for translating data type
-- TODO: could include size, precision, scale here
{% set column = api.Column.create(col[0], col[1]) %}
{% do columns.append(column) %}
{% endfor %}
{{ return(columns) }}
{% endmacro %}

-- here for back compat
{% macro get_columns_in_query(select_sql) -%}
{{ return(adapter.dispatch('get_columns_in_query', 'dbt')(select_sql)) }}
{% endmacro %}

{% macro default__get_columns_in_query(select_sql) %}
{% call statement('get_columns_in_query', fetch_result=True, auto_begin=False) -%}
select * from (
{{ select_sql }}
) as __dbt_sbq
where false
limit 0
{{ get_empty_subquery_sql(select_sql) }}
{% endcall %}

{{ return(load_result('get_columns_in_query').table.columns | map(attribute='name') | list) }}
{% endmacro %}


{% macro alter_column_type(relation, column_name, new_column_type) -%}
{{ return(adapter.dispatch('alter_column_type', 'dbt')(relation, column_name, new_column_type)) }}
{% endmacro %}
Expand Down
9 changes: 9 additions & 0 deletions plugins/postgres/dbt/adapters/postgres/connections.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from contextlib import contextmanager

import psycopg2
from psycopg2.extensions import string_types

import dbt.exceptions
from dbt.adapters.base import Credentials
Expand Down Expand Up @@ -190,3 +191,11 @@ def get_response(cls, cursor) -> AdapterResponse:
status_messsage_strings = [part for part in status_message_parts if not part.isdigit()]
code = " ".join(status_messsage_strings)
return AdapterResponse(_message=message, code=code, rows_affected=rows)

@classmethod
def data_type_code_to_name(cls, type_code: int) -> str:
return string_types[type_code].name

# For dbt-snowflake
# from snowflake.connector.constants import FIELD_ID_TO_NAME
# return FIELD_ID_TO_NAME[type_code]