Skip to content
Prev Previous commit
Next Next commit
update connections.py
  • Loading branch information
aranke committed May 9, 2023
commit 17e3d0d172b53b47c18f2802e637ec1e574d830e
11 changes: 7 additions & 4 deletions core/dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,16 @@ def process_results(
return [dict(zip(column_names, row)) for row in rows]

@classmethod
def get_result_from_cursor(cls, cursor: Any) -> agate.Table:
def get_result_from_cursor(cls, cursor: Any, limit: Optional[int]) -> agate.Table:
data: List[Any] = []
column_names: List[str] = []

if cursor.description is not None:
column_names = [col[0] for col in cursor.description]
rows = cursor.fetchall()
if limit:
rows = cursor.fetchmany(limit)
else:
rows = cursor.fetchall()
data = cls.process_results(column_names, rows)

return dbt.clients.agate_helper.table_from_data_flat(data, column_names)
Expand All @@ -138,13 +141,13 @@ def data_type_code_to_name(cls, type_code: Union[int, str]) -> str:
)

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
self, sql: str, auto_begin: bool = False, fetch: bool = False, limit: Optional[int] = None
) -> Tuple[AdapterResponse, agate.Table]:
sql = self._add_query_comment(sql)
_, cursor = self.add_query(sql, auto_begin)
response = self.get_response(cursor)
if fetch:
table = self.get_result_from_cursor(cursor)
table = self.get_result_from_cursor(cursor, limit)
else:
table = dbt.clients.agate_helper.empty_table()
return response, table
Expand Down