Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20230823-194237.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Update typing to meet mypy standards
time: 2023-08-23T19:42:37.130694-04:00
custom:
Author: mikealfare
Issue: "8396"
23 changes: 22 additions & 1 deletion core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,15 +400,36 @@ def _add_query_comment(self, sql: str) -> str:

@abc.abstractmethod
def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
self, sql: str, auto_begin: bool = False, fetch: bool = False, limit: Optional[int] = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was leftover from the implementation of limit in a previous change. The type here matches that of the original change.

) -> Tuple[AdapterResponse, agate.Table]:
"""Execute the given SQL.

:param str sql: The sql to execute.
:param bool auto_begin: If set, and dbt is not currently inside a
transaction, automatically begin one.
:param bool fetch: If set, fetch results.
:param int limit: If set, limits the result set
:return: A tuple of the query status and results (empty if fetch=False).
:rtype: Tuple[AdapterResponse, agate.Table]
"""
raise dbt.exceptions.NotImplementedError("`execute` is not implemented for this adapter!")

def add_select_query(self, sql: str) -> Tuple[Connection, Any]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says it all. This is overridden in SQLConnectionManager, but is used in BaseAdapter, which uses ConnectionManager. This is not expected to exist today, and the BaseAdapter method only works if this method is defined anyway.

"""
This was added here because base.impl.BaseAdapter.get_column_schema_from_query expects it to be here.
That method wouldn't work unless the adapter used sql.impl.SQLAdapter, sql.connections.SQLConnectionManager
or defined this method on <Adapter>ConnectionManager before passing it in to <Adapter>Adapter.

See https://github.com/dbt-labs/dbt-core/issues/8396 for more information.
"""
raise dbt.exceptions.NotImplementedError(
"`add_select_query` is not implemented for this adapter!"
)

@classmethod
def data_type_code_to_name(cls, type_code: Union[int, str]) -> str:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved up from SQLConnection.

"""Get the string representation of the data type from the type_code."""
# https://peps.python.org/pep-0249/#type-objects
raise dbt.exceptions.NotImplementedError(
"`data_type_code_to_name` is not implemented for this adapter!"
)
24 changes: 16 additions & 8 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
UnexpectedNullError,
)

from dbt.adapters.protocol import AdapterConfig, ConnectionManagerProtocol
from dbt.adapters.protocol import AdapterConfig
from dbt.clients.agate_helper import empty_table, merge_tables, table_from_rows
from dbt.clients.jinja import MacroGenerator
from dbt.contracts.graph.manifest import Manifest, MacroManifest
Expand All @@ -60,7 +60,7 @@
)
from dbt.utils import filter_null_values, executor, cast_to_str, AttrDict

from dbt.adapters.base.connections import Connection, AdapterResponse
from dbt.adapters.base.connections import Connection, AdapterResponse, BaseConnectionManager
from dbt.adapters.base.meta import AdapterMeta, available
from dbt.adapters.base.relation import (
ComponentName,
Expand Down Expand Up @@ -208,7 +208,7 @@ class BaseAdapter(metaclass=AdapterMeta):

Relation: Type[BaseRelation] = BaseRelation
Column: Type[BaseColumn] = BaseColumn
ConnectionManager: Type[ConnectionManagerProtocol]
ConnectionManager: Type[BaseConnectionManager]

# A set of clobber config fields accepted by this adapter
# for use in materializations
Expand Down Expand Up @@ -315,14 +315,21 @@ def get_column_schema_from_query(self, sql: str) -> List[BaseColumn]:

@available.parse(lambda *a, **k: ("", empty_table()))
def get_partitions_metadata(self, table: str) -> Tuple[agate.Table]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably live in dbt-bigquery, based on the comment. But to preserve backwards compatibility, I checked that the method exists prior to calling it. This makes mypy happy while preserving backwards compatibility.

"""Obtain partitions metadata for a BigQuery partitioned table.
"""
TODO: Can we move this to dbt-bigquery?
Obtain partitions metadata for a BigQuery partitioned table.

:param str table_id: a partitioned table id, in standard SQL format.
:param str table: a partitioned table id, in standard SQL format.
:return: a partition metadata tuple, as described in
https://cloud.google.com/bigquery/docs/creating-partitioned-tables#getting_partition_metadata_using_meta_tables.
:rtype: agate.Table
"""
return self.connections.get_partitions_metadata(table=table)
if hasattr(self.connections, "get_partitions_metadata"):
return self.connections.get_partitions_metadata(table=table)
else:
raise NotImplementedError(
"`get_partitions_metadata` is not implemented for this adapter!"
)

###
# Methods that should never be overridden
Expand Down Expand Up @@ -453,9 +460,10 @@ def _relations_cache_for_schemas(
# it's possible that there were no relations in some schemas. We want
# to insert the schemas we query into the cache's `.schemas` attribute
# so we can check it later
cache_update: Set[Tuple[Optional[str], Optional[str]]] = set()
cache_update: Set[Tuple[Optional[str], str]] = set()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type aligns with the change below and with what update_schemas expects (which is the only reason for this variable).

for relation in cache_schemas:
cache_update.add((relation.database, relation.schema))
if relation.schema:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mypy balked at relation.schema potentially not existing. That's because BaseRelation lists all of its parts as optional (though at least one must exist). In this scenario, we would expect schema to exist, as the type above suggests. Adding this check satisfies mypy and doesn't deviate much from the existing code.

cache_update.add((relation.database, relation.schema))
self.cache.update_schemas(cache_update)

def set_relations_cache(
Expand Down
10 changes: 1 addition & 9 deletions core/dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import abc
import time
from typing import List, Optional, Tuple, Any, Iterable, Dict, Union
from typing import List, Optional, Tuple, Any, Iterable, Dict

import agate

Expand Down Expand Up @@ -131,14 +131,6 @@ def get_result_from_cursor(cls, cursor: Any, limit: Optional[int]) -> agate.Tabl

return dbt.clients.agate_helper.table_from_data_flat(data, column_names)

@classmethod
def data_type_code_to_name(cls, type_code: Union[int, str]) -> str:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already an abstract method. Move it up to ConnectionManager since BaseAdapter expects it to exist for some methods.

"""Get the string representation of the data type from the type_code."""
# https://peps.python.org/pep-0249/#type-objects
raise dbt.exceptions.NotImplementedError(
"`data_type_code_to_name` is not implemented for this adapter!"
)

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False, limit: Optional[int] = None
) -> Tuple[AdapterResponse, agate.Table]:
Expand Down