Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix(ingest): improve redshift ingestion performance
  • Loading branch information
hsheth2 committed Jun 3, 2021
commit d0bffc83be079099dd1c0beb303973fbc94b5251
2 changes: 1 addition & 1 deletion metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ source:
config:
username: user
password: pass
host_port: localhost:5432
host_port: example.something.us-west-2.redshift.amazonaws.com:5439
database: DemoDatabase
# table_pattern/schema_pattern is same as above
# options is same as above
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def get_long_description():
"mssql": sql_common | {"sqlalchemy-pytds>=0.3"},
"mysql": sql_common | {"pymysql>=1.0.2"},
"postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"},
"redshift": sql_common | {"psycopg2-binary", "GeoAlchemy2"},
"redshift": sql_common | {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"},
"snowflake": sql_common | {"snowflake-sqlalchemy"},
"oracle": sql_common | {"cx_Oracle"},
"ldap": {"python-ldap>=2.4"},
Expand Down
33 changes: 29 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/redshift.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,32 @@
from datahub.ingestion.source.postgres import PostgresSource
# These imports verifies that the dependencies are available.
import psycopg2 # noqa: F401
import sqlalchemy_redshift # noqa: F401

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.postgres import PostgresConfig
from datahub.ingestion.source.sql_common import SQLAlchemySource

# TRICKY: it's necessary to import the Postgres source because
# that module has some side effects that we care about here.


class RedshiftConfig(PostgresConfig):
# Although Amazon Redshift is compatible with Postgres's wire format,
# we actually want to use the sqlalchemy-redshift package and dialect
# because it has better caching behavior. In particular, it queries
# the full table, column, and constraint information in a single larger
# query, and then simply pulls out the relevant information as needed.
# Because of this behavior, it uses dramatically fewer round trips for
# large Redshift warehouses. As an example, see this query for the columns:
# https://github.com/sqlalchemy-redshift/sqlalchemy-redshift/blob/60b4db04c1d26071c291aeea52f1dcb5dd8b0eb0/sqlalchemy_redshift/dialect.py#L745.
scheme = "redshift+psycopg2"


class RedshiftSource(SQLAlchemySource):
def __init__(self, config: RedshiftConfig, ctx: PipelineContext):
super().__init__(config, ctx, "redshift")

class RedshiftSource(PostgresSource):
def __init__(self, config, ctx):
SQLAlchemySource.__init__(self, config, ctx, "redshift")
@classmethod
def create(cls, config_dict, ctx):
config = RedshiftConfig.parse_obj(config_dict)
return cls(config, ctx)