Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
monkey patch table comments
  • Loading branch information
hsheth2 committed Jun 3, 2021
commit b134587978b1ad679a83daa947a77f89ae7ef2cc
47 changes: 46 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/redshift.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# These imports verifies that the dependencies are available.
from typing import Dict

# These imports verify that the dependencies are available.
import psycopg2 # noqa: F401
import sqlalchemy_redshift # noqa: F401
from sqlalchemy.engine import reflection
from sqlalchemy_redshift.dialect import RedshiftDialect, RelationKey

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.postgres import PostgresConfig
Expand All @@ -22,6 +26,47 @@ class RedshiftConfig(PostgresConfig):
scheme = "redshift+psycopg2"


# reflection.cache uses eval and other magic to partially rewrite the function.
# mypy can't handle it, so we ignore it for now.
@reflection.cache # type: ignore
def _get_all_table_comments(self, connection, **kw):
COMMENT_SQL = """
SELECT n.nspname as schema,
c.relname as table_name,
pgd.description as table_comment
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
LEFT JOIN pg_catalog.pg_description pgd ON pgd.objsubid = 0 AND pgd.objoid = c.oid
WHERE c.relkind in ('r', 'v', 'm', 'f', 'p')
AND pgd.description IS NOT NULL
ORDER BY "schema", "table_name";
"""

all_table_comments: Dict[RelationKey, str] = {}

result = connection.execute(COMMENT_SQL)
for table in result:
key = RelationKey(table.table_name, table.schema, connection)
all_table_comments[key] = table.table_comment

return all_table_comments


@reflection.cache # type: ignore
def get_table_comment(self, connection, table_name, schema=None, **kw):
all_table_comments = self._get_all_table_comments(connection, **kw)
key = RelationKey(table_name, schema, connection)
if key not in all_table_comments.keys():
key = key.unquoted()
return {"text": all_table_comments.get(key)}


# This monkey-patching enables us to batch fetch the table descriptions, rather than
# fetching them one at a time.
RedshiftDialect._get_all_table_comments = _get_all_table_comments
RedshiftDialect.get_table_comment = get_table_comment


class RedshiftSource(SQLAlchemySource):
def __init__(self, config: RedshiftConfig, ctx: PipelineContext):
super().__init__(config, ctx, "redshift")
Expand Down
9 changes: 6 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ def __init__(self, config: SQLAlchemyConfig, ctx: PipelineContext, platform: str

def get_workunits(self) -> Iterable[SqlWorkUnit]:
sql_config = self.config
platform = self.platform
if logger.isEnabledFor(logging.DEBUG):
# If debug logging is enabled, we also want to echo each SQL query issued.
sql_config.options["echo"] = True

url = sql_config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **sql_config.options)
Expand Down Expand Up @@ -235,7 +238,7 @@ def get_workunits(self) -> Iterable[SqlWorkUnit]:
# TODO: capture inspector.get_sorted_table_and_fkc_names

dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{self.config.env})",
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})",
aspects=[],
)
if description is not None or properties:
Expand All @@ -246,7 +249,7 @@ def get_workunits(self) -> Iterable[SqlWorkUnit]:
)
dataset_snapshot.aspects.append(dataset_properties)
schema_metadata = get_schema_metadata(
self.report, dataset_name, platform, columns
self.report, dataset_name, self.platform, columns
)
dataset_snapshot.aspects.append(schema_metadata)

Expand Down