Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ source:
config:
username: user
password: pass
host_port: localhost:5432
host_port: example.something.us-west-2.redshift.amazonaws.com:5439
database: DemoDatabase
# table_pattern/schema_pattern is same as above
# options is same as above
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def get_long_description():
"mssql": sql_common | {"sqlalchemy-pytds>=0.3"},
"mysql": sql_common | {"pymysql>=1.0.2"},
"postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"},
"redshift": sql_common | {"psycopg2-binary", "GeoAlchemy2"},
"redshift": sql_common | {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"},
"snowflake": sql_common | {"snowflake-sqlalchemy"},
"oracle": sql_common | {"cx_Oracle"},
"ldap": {"python-ldap>=2.4"},
Expand Down
78 changes: 74 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/redshift.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,77 @@
from datahub.ingestion.source.postgres import PostgresSource
from typing import Dict

# These imports verify that the dependencies are available.
import psycopg2 # noqa: F401
import sqlalchemy_redshift # noqa: F401
from sqlalchemy.engine import reflection
from sqlalchemy_redshift.dialect import RedshiftDialect, RelationKey

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.postgres import PostgresConfig
from datahub.ingestion.source.sql_common import SQLAlchemySource

# TRICKY: it's necessary to import the Postgres source because
# that module has some side effects that we care about here.


class RedshiftConfig(PostgresConfig):
# Although Amazon Redshift is compatible with Postgres's wire format,
# we actually want to use the sqlalchemy-redshift package and dialect
# because it has better caching behavior. In particular, it queries
# the full table, column, and constraint information in a single larger
# query, and then simply pulls out the relevant information as needed.
# Because of this behavior, it uses dramatically fewer round trips for
# large Redshift warehouses. As an example, see this query for the columns:
# https://github.com/sqlalchemy-redshift/sqlalchemy-redshift/blob/60b4db04c1d26071c291aeea52f1dcb5dd8b0eb0/sqlalchemy_redshift/dialect.py#L745.
scheme = "redshift+psycopg2"


# reflection.cache uses eval and other magic to partially rewrite the function.
# mypy can't handle it, so we ignore it for now.
@reflection.cache # type: ignore
def _get_all_table_comments(self, connection, **kw):
COMMENT_SQL = """
SELECT n.nspname as schema,
c.relname as table_name,
pgd.description as table_comment
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
LEFT JOIN pg_catalog.pg_description pgd ON pgd.objsubid = 0 AND pgd.objoid = c.oid
WHERE c.relkind in ('r', 'v', 'm', 'f', 'p')
AND pgd.description IS NOT NULL
ORDER BY "schema", "table_name";
"""

all_table_comments: Dict[RelationKey, str] = {}

result = connection.execute(COMMENT_SQL)
for table in result:
key = RelationKey(table.table_name, table.schema, connection)
all_table_comments[key] = table.table_comment

return all_table_comments


@reflection.cache # type: ignore
def get_table_comment(self, connection, table_name, schema=None, **kw):
all_table_comments = self._get_all_table_comments(connection, **kw)
key = RelationKey(table_name, schema, connection)
if key not in all_table_comments.keys():
key = key.unquoted()
return {"text": all_table_comments.get(key)}


# This monkey-patching enables us to batch fetch the table descriptions, rather than
# fetching them one at a time.
RedshiftDialect._get_all_table_comments = _get_all_table_comments
RedshiftDialect.get_table_comment = get_table_comment


class RedshiftSource(SQLAlchemySource):
def __init__(self, config: RedshiftConfig, ctx: PipelineContext):
super().__init__(config, ctx, "redshift")

class RedshiftSource(PostgresSource):
def __init__(self, config, ctx):
SQLAlchemySource.__init__(self, config, ctx, "redshift")
@classmethod
def create(cls, config_dict, ctx):
config = RedshiftConfig.parse_obj(config_dict)
return cls(config, ctx)
9 changes: 6 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ def __init__(self, config: SQLAlchemyConfig, ctx: PipelineContext, platform: str

def get_workunits(self) -> Iterable[SqlWorkUnit]:
sql_config = self.config
platform = self.platform
if logger.isEnabledFor(logging.DEBUG):
# If debug logging is enabled, we also want to echo each SQL query issued.
sql_config.options["echo"] = True

url = sql_config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **sql_config.options)
Expand Down Expand Up @@ -235,7 +238,7 @@ def get_workunits(self) -> Iterable[SqlWorkUnit]:
# TODO: capture inspector.get_sorted_table_and_fkc_names

dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{self.config.env})",
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})",
aspects=[],
)
if description is not None or properties:
Expand All @@ -246,7 +249,7 @@ def get_workunits(self) -> Iterable[SqlWorkUnit]:
)
dataset_snapshot.aspects.append(dataset_properties)
schema_metadata = get_schema_metadata(
self.report, dataset_name, platform, columns
self.report, dataset_name, self.platform, columns
)
dataset_snapshot.aspects.append(schema_metadata)

Expand Down