From d0bffc83be079099dd1c0beb303973fbc94b5251 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 2 Jun 2021 23:22:44 -0700 Subject: [PATCH 1/2] fix(ingest): improve redshift ingestion performance --- metadata-ingestion/README.md | 2 +- metadata-ingestion/setup.py | 2 +- .../src/datahub/ingestion/source/redshift.py | 33 ++++++++++++++++--- 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 90808ebf9b806..79f0a9f4af587 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -299,7 +299,7 @@ source: config: username: user password: pass - host_port: localhost:5432 + host_port: example.something.us-west-2.redshift.amazonaws.com:5439 database: DemoDatabase # table_pattern/schema_pattern is same as above # options is same as above diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 39fc4f5ebdd6d..53bfc8507f8a3 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -79,7 +79,7 @@ def get_long_description(): "mssql": sql_common | {"sqlalchemy-pytds>=0.3"}, "mysql": sql_common | {"pymysql>=1.0.2"}, "postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"}, - "redshift": sql_common | {"psycopg2-binary", "GeoAlchemy2"}, + "redshift": sql_common | {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"}, "snowflake": sql_common | {"snowflake-sqlalchemy"}, "oracle": sql_common | {"cx_Oracle"}, "ldap": {"python-ldap>=2.4"}, diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift.py index 3d3c867282f9b..0ea5f85388a63 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift.py @@ -1,7 +1,32 @@ -from datahub.ingestion.source.postgres import PostgresSource +# These imports verifies that the dependencies are available. +import psycopg2 # noqa: F401 +import sqlalchemy_redshift # noqa: F401 + +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.postgres import PostgresConfig from datahub.ingestion.source.sql_common import SQLAlchemySource +# TRICKY: it's necessary to import the Postgres source because +# that module has some side effects that we care about here. + + +class RedshiftConfig(PostgresConfig): + # Although Amazon Redshift is compatible with Postgres's wire format, + # we actually want to use the sqlalchemy-redshift package and dialect + # because it has better caching behavior. In particular, it queries + # the full table, column, and constraint information in a single larger + # query, and then simply pulls out the relevant information as needed. + # Because of this behavior, it uses dramatically fewer round trips for + # large Redshift warehouses. As an example, see this query for the columns: + # https://github.com/sqlalchemy-redshift/sqlalchemy-redshift/blob/60b4db04c1d26071c291aeea52f1dcb5dd8b0eb0/sqlalchemy_redshift/dialect.py#L745. + scheme = "redshift+psycopg2" + + +class RedshiftSource(SQLAlchemySource): + def __init__(self, config: RedshiftConfig, ctx: PipelineContext): + super().__init__(config, ctx, "redshift") -class RedshiftSource(PostgresSource): - def __init__(self, config, ctx): - SQLAlchemySource.__init__(self, config, ctx, "redshift") + @classmethod + def create(cls, config_dict, ctx): + config = RedshiftConfig.parse_obj(config_dict) + return cls(config, ctx) From b134587978b1ad679a83daa947a77f89ae7ef2cc Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 3 Jun 2021 10:25:51 -0700 Subject: [PATCH 2/2] monkey patch table comments --- .../src/datahub/ingestion/source/redshift.py | 47 ++++++++++++++++++- .../datahub/ingestion/source/sql_common.py | 9 ++-- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift.py index 0ea5f85388a63..ae9fe7cd62617 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift.py @@ -1,6 +1,10 @@ -# These imports verifies that the dependencies are available. +from typing import Dict + +# These imports verify that the dependencies are available. import psycopg2 # noqa: F401 import sqlalchemy_redshift # noqa: F401 +from sqlalchemy.engine import reflection +from sqlalchemy_redshift.dialect import RedshiftDialect, RelationKey from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.postgres import PostgresConfig @@ -22,6 +26,47 @@ class RedshiftConfig(PostgresConfig): scheme = "redshift+psycopg2" +# reflection.cache uses eval and other magic to partially rewrite the function. +# mypy can't handle it, so we ignore it for now. +@reflection.cache # type: ignore +def _get_all_table_comments(self, connection, **kw): + COMMENT_SQL = """ + SELECT n.nspname as schema, + c.relname as table_name, + pgd.description as table_comment + FROM pg_catalog.pg_class c + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + LEFT JOIN pg_catalog.pg_description pgd ON pgd.objsubid = 0 AND pgd.objoid = c.oid + WHERE c.relkind in ('r', 'v', 'm', 'f', 'p') + AND pgd.description IS NOT NULL + ORDER BY "schema", "table_name"; + """ + + all_table_comments: Dict[RelationKey, str] = {} + + result = connection.execute(COMMENT_SQL) + for table in result: + key = RelationKey(table.table_name, table.schema, connection) + all_table_comments[key] = table.table_comment + + return all_table_comments + + +@reflection.cache # type: ignore +def get_table_comment(self, connection, table_name, schema=None, **kw): + all_table_comments = self._get_all_table_comments(connection, **kw) + key = RelationKey(table_name, schema, connection) + if key not in all_table_comments.keys(): + key = key.unquoted() + return {"text": all_table_comments.get(key)} + + +# This monkey-patching enables us to batch fetch the table descriptions, rather than +# fetching them one at a time. +RedshiftDialect._get_all_table_comments = _get_all_table_comments +RedshiftDialect.get_table_comment = get_table_comment + + class RedshiftSource(SQLAlchemySource): def __init__(self, config: RedshiftConfig, ctx: PipelineContext): super().__init__(config, ctx, "redshift") diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql_common.py index 270416d4d9d8b..c5f66b02cf01c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql_common.py @@ -200,7 +200,10 @@ def __init__(self, config: SQLAlchemyConfig, ctx: PipelineContext, platform: str def get_workunits(self) -> Iterable[SqlWorkUnit]: sql_config = self.config - platform = self.platform + if logger.isEnabledFor(logging.DEBUG): + # If debug logging is enabled, we also want to echo each SQL query issued. + sql_config.options["echo"] = True + url = sql_config.get_sql_alchemy_url() logger.debug(f"sql_alchemy_url={url}") engine = create_engine(url, **sql_config.options) @@ -235,7 +238,7 @@ def get_workunits(self) -> Iterable[SqlWorkUnit]: # TODO: capture inspector.get_sorted_table_and_fkc_names dataset_snapshot = DatasetSnapshot( - urn=f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{self.config.env})", + urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})", aspects=[], ) if description is not None or properties: @@ -246,7 +249,7 @@ def get_workunits(self) -> Iterable[SqlWorkUnit]: ) dataset_snapshot.aspects.append(dataset_properties) schema_metadata = get_schema_metadata( - self.report, dataset_name, platform, columns + self.report, dataset_name, self.platform, columns ) dataset_snapshot.aspects.append(schema_metadata)