From 0fa27b591ae4a0b1bb1d935babb2850497703a9e Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 28 Jun 2021 17:35:10 -0700 Subject: [PATCH 1/4] sqlalchemy uri refactoring --- .../src/datahub/ingestion/source/athena.py | 28 +++++----- .../datahub/ingestion/source/aws_common.py | 3 +- .../src/datahub/ingestion/source/dbt.py | 4 +- .../src/datahub/ingestion/source/druid.py | 3 +- .../src/datahub/ingestion/source/feast.py | 3 +- .../src/datahub/ingestion/source/glue.py | 1 - .../src/datahub/ingestion/source/kafka.py | 4 +- .../datahub/ingestion/source/kafka_connect.py | 2 +- .../src/datahub/ingestion/source/looker.py | 4 +- .../src/datahub/ingestion/source/lookml.py | 4 +- .../src/datahub/ingestion/source/mongodb.py | 4 +- .../src/datahub/ingestion/source/mssql.py | 2 +- .../src/datahub/ingestion/source/snowflake.py | 48 ++++++++++++----- .../ingestion/source/snowflake_usage.py | 10 ++-- .../datahub/ingestion/source/sql_common.py | 53 ++++++++++++++----- .../src/datahub/ingestion/source/superset.py | 3 +- 16 files changed, 111 insertions(+), 65 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/athena.py b/metadata-ingestion/src/datahub/ingestion/source/athena.py index 105a7a0a0f0ff0..bd029daade2f00 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/athena.py @@ -1,7 +1,6 @@ from typing import Optional -from urllib.parse import quote_plus -from .sql_common import SQLAlchemyConfig, SQLAlchemySource +from .sql_common import SQLAlchemyConfig, SQLAlchemySource, make_sqlalchemy_uri class AthenaConfig(SQLAlchemyConfig): @@ -14,20 +13,17 @@ class AthenaConfig(SQLAlchemyConfig): work_group: str def get_sql_alchemy_url(self): - url = f"{self.scheme}://" - if self.username: - url += f"{quote_plus(self.username)}" - if self.password: - url += f":{quote_plus(self.password)}" - else: - url += ":" - url += f"@athena.{self.aws_region}.amazonaws.com:443/" - if self.database: - url += f"{self.database}" - url += f"?s3_staging_dir={quote_plus(self.s3_staging_dir)}" - url += f"&work_group={self.work_group}" - - return url + return make_sqlalchemy_uri( + self.scheme, + self.username or "", + self.password, + f"athena.{self.aws_region}.amazonaws.com:443", + self.database, + uri_opts={ + "s3_staging_dir": self.s3_staging_dir, + "work_group": self.work_group, + }, + ) class AthenaSource(SQLAlchemySource): diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws_common.py b/metadata-ingestion/src/datahub/ingestion/source/aws_common.py index 03a0c44699fb1f..5ce8a7eed1f1f1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws_common.py @@ -5,6 +5,7 @@ from datahub.configuration import ConfigModel from datahub.configuration.common import AllowDenyPattern +from datahub.emitter.mce_builder import DEFAULT_ENV def assume_role( @@ -34,7 +35,7 @@ class AwsSourceConfig(ConfigModel): - SageMaker source """ - env: str = "PROD" + env: str = DEFAULT_ENV database_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt.py b/metadata-ingestion/src/datahub/ingestion/source/dbt.py index 82a7d758f89e3d..fa9832db8a7a3d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt.py @@ -6,7 +6,7 @@ from datahub.configuration import ConfigModel from datahub.configuration.common import AllowDenyPattern -from datahub.emitter.mce_builder import get_sys_time +from datahub.emitter.mce_builder import DEFAULT_ENV, get_sys_time from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit @@ -44,7 +44,7 @@ class DBTConfig(ConfigModel): manifest_path: str catalog_path: str sources_path: Optional[str] - env: str = "PROD" + env: str = DEFAULT_ENV target_platform: str load_schemas: bool node_type_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() diff --git a/metadata-ingestion/src/datahub/ingestion/source/druid.py b/metadata-ingestion/src/datahub/ingestion/source/druid.py index 486e29e891754d..49b9d38eaebcf0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/druid.py +++ b/metadata-ingestion/src/datahub/ingestion/source/druid.py @@ -12,7 +12,7 @@ class DruidConfig(BasicSQLAlchemyConfig): schema_pattern: AllowDenyPattern = AllowDenyPattern(deny=["^(lookup|sys).*"]) def get_sql_alchemy_url(self): - return f"{BasicSQLAlchemyConfig.get_sql_alchemy_url(self)}/druid/v2/sql/" + return f"{super().get_sql_alchemy_url(self)}/druid/v2/sql/" """ The pydruid library already formats the table name correctly, so we do not @@ -20,7 +20,6 @@ def get_sql_alchemy_url(self): every URN would incorrectly start with "druid. For more information, see https://druid.apache.org/docs/latest/querying/sql.html#schemata-table - """ def get_identifier(self, schema: str, table: str) -> str: diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index 4cbfae0bc46341..dbb397be54f187 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -9,6 +9,7 @@ import datahub.emitter.mce_builder as builder from datahub.configuration.common import ConfigModel +from datahub.emitter.mce_builder import DEFAULT_ENV from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit @@ -45,8 +46,6 @@ "UNIX_TIMESTAMP_LIST": MLFeatureDataType.SEQUENCE, } -DEFAULT_ENV = "PROD" - # image to use for initial feast extraction HOSTED_FEAST_IMAGE = "acryldata/datahub-ingestion-feast-wrapper" diff --git a/metadata-ingestion/src/datahub/ingestion/source/glue.py b/metadata-ingestion/src/datahub/ingestion/source/glue.py index e22bcfd8a19295..8e152aa03ab259 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/glue.py @@ -46,7 +46,6 @@ class GlueSourceConfig(AwsSourceConfig): - extract_transforms: Optional[bool] = True @property diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index 29ca9249aef497..e13ea498f6c807 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -9,7 +9,7 @@ from datahub.configuration import ConfigModel from datahub.configuration.common import AllowDenyPattern from datahub.configuration.kafka import KafkaConsumerConnectionConfig -from datahub.emitter.mce_builder import get_sys_time +from datahub.emitter.mce_builder import DEFAULT_ENV, get_sys_time from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit @@ -26,7 +26,7 @@ class KafkaSourceConfig(ConfigModel): - env: str = "PROD" + env: str = DEFAULT_ENV # TODO: inline the connection config connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig() topic_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"]) diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py index 7c5bc1e6583fb7..4e8a3a678c4f88 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py @@ -23,7 +23,7 @@ class KafkaConnectSourceConfig(ConfigModel): username: Optional[str] = None password: Optional[str] = None cluster_name: Optional[str] = "connect-cluster" - env: str = "PROD" + env: str = builder.DEFAULT_ENV connector_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"]) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker.py b/metadata-ingestion/src/datahub/ingestion/source/looker.py index ac6536e0110709..1d95fc317818ec 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker.py @@ -17,7 +17,7 @@ from datahub.configuration import ConfigModel from datahub.configuration.common import AllowDenyPattern -from datahub.emitter.mce_builder import get_sys_time +from datahub.emitter.mce_builder import DEFAULT_ENV, get_sys_time from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit @@ -52,7 +52,7 @@ class LookerDashboardSourceConfig(ConfigModel): actor: str = "urn:li:corpuser:etl" dashboard_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() chart_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() - env: str = "PROD" + env: str = DEFAULT_ENV @dataclass diff --git a/metadata-ingestion/src/datahub/ingestion/source/lookml.py b/metadata-ingestion/src/datahub/ingestion/source/lookml.py index 110fbc8b943754..e622a17611fcf9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/lookml.py +++ b/metadata-ingestion/src/datahub/ingestion/source/lookml.py @@ -17,7 +17,7 @@ from datahub.configuration import ConfigModel from datahub.configuration.common import AllowDenyPattern -from datahub.emitter.mce_builder import get_sys_time +from datahub.emitter.mce_builder import DEFAULT_ENV, get_sys_time from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit @@ -57,7 +57,7 @@ class LookMLSourceConfig(ConfigModel): # pragma: no cover actor: str = "urn:li:corpuser:etl" model_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() view_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() - env: str = "PROD" + env: str = DEFAULT_ENV parse_table_names_from_sql: bool = False diff --git a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py index 463227ba2f1dc6..48a52575e61200 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py @@ -11,7 +11,7 @@ from pymongo.mongo_client import MongoClient from datahub.configuration.common import AllowDenyPattern, ConfigModel -from datahub.emitter.mce_builder import get_sys_time +from datahub.emitter.mce_builder import DEFAULT_ENV, get_sys_time from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit @@ -41,8 +41,6 @@ # https://stackoverflow.com/a/48273736/5004662. DENY_DATABASE_LIST = set(["admin", "config", "local"]) -DEFAULT_ENV = "PROD" - class MongoDBConfig(ConfigModel): # See the MongoDB authentication docs for details and examples. diff --git a/metadata-ingestion/src/datahub/ingestion/source/mssql.py b/metadata-ingestion/src/datahub/ingestion/source/mssql.py index 8f7cb8a8edffe4..d988fee8dc3e36 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mssql.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mssql.py @@ -31,8 +31,8 @@ def get_sql_alchemy_url(self): import pyodbc # noqa: F401 self.scheme = "mssql+pyodbc" - uri = super().get_sql_alchemy_url() + uri = super().get_sql_alchemy_url(uri_opts=None) if self.use_odbc: uri = f"{uri}?{urllib.parse.urlencode(self.uri_args)}" return uri diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake.py index 5ddc217e03266e..3cd5244509be66 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake.py @@ -1,13 +1,19 @@ +import logging from typing import Optional +import pydantic + # This import verifies that the dependencies are available. import snowflake.sqlalchemy # noqa: F401 from snowflake.sqlalchemy import custom_types +from datahub.configuration.common import AllowDenyPattern, ConfigModel + from .sql_common import ( - BasicSQLAlchemyConfig, + SQLAlchemyConfig, SQLAlchemySource, TimeTypeClass, + make_sqlalchemy_uri, register_custom_type, ) @@ -15,26 +21,44 @@ register_custom_type(custom_types.TIMESTAMP_LTZ, TimeTypeClass) register_custom_type(custom_types.TIMESTAMP_NTZ, TimeTypeClass) +logger: logging.Logger = logging.getLogger(__name__) + -class SnowflakeConfig(BasicSQLAlchemyConfig): +class BaseSnowflakeConfig(ConfigModel): # Note: this config model is also used by the snowflake-usage source. scheme = "snowflake" - database: str # database is required + username: Optional[str] = None + password: Optional[str] = None + host_port: str warehouse: Optional[str] role: Optional[str] + def get_sql_alchemy_url(self, database=None): + return make_sqlalchemy_uri( + self.scheme, + self.username, + self.password, + self.host_port, + database, + uri_opts={ + # Drop the options if value is None. + key: value + for (key, value) in { + "warehouse": self.warehouse, + "role": self.role, + }.items() + if value + }, + ) + + +class SnowflakeConfig(BaseSnowflakeConfig, SQLAlchemyConfig): + database: str + def get_sql_alchemy_url(self): - connect_string = super().get_sql_alchemy_url() - options = { - "warehouse": self.warehouse, - "role": self.role, - } - params = "&".join(f"{key}={value}" for (key, value) in options.items() if value) - if params: - connect_string = f"{connect_string}?{params}" - return connect_string + return super().get_sql_alchemy_url(self.database) def get_identifier(self, schema: str, table: str) -> str: regular = super().get_identifier(schema, table) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake_usage.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake_usage.py index 1c3386f366875a..c764e53f9deb98 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake_usage.py @@ -13,7 +13,7 @@ import datahub.emitter.mce_builder as builder from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import UsageStatsWorkUnit -from datahub.ingestion.source.snowflake import SnowflakeConfig +from datahub.ingestion.source.snowflake import BaseSnowflakeConfig, SnowflakeConfig from datahub.ingestion.source.usage_common import ( BaseUsageConfig, GenericAggregatedDataset, @@ -86,8 +86,9 @@ class SnowflakeJoinedAccessEvent: role_name: str -class SnowflakeUsageConfig(SnowflakeConfig, BaseUsageConfig): - database: str = "snowflake" +class SnowflakeUsageConfig(BaseSnowflakeConfig, BaseUsageConfig): + env: str = builder.DEFAULT_ENV + options: dict = {} @pydantic.validator("role", always=True) def role_accountadmin(cls, v): @@ -100,6 +101,9 @@ def role_accountadmin(cls, v): ) return v + def get_sql_alchemy_url(self): + return super().get_sql_alchemy_url(database="snowflake") + @dataclasses.dataclass class SnowflakeUsageSource(Source): diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql_common.py index cb8668d9969ac9..48632dcbdbf6c4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql_common.py @@ -2,13 +2,14 @@ from abc import abstractmethod from dataclasses import dataclass, field from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Type +from urllib.parse import quote_plus from sqlalchemy import create_engine, inspect from sqlalchemy.engine.reflection import Inspector from sqlalchemy.sql import sqltypes as types from datahub.configuration.common import AllowDenyPattern, ConfigModel -from datahub.emitter.mce_builder import get_sys_time +from datahub.emitter.mce_builder import DEFAULT_ENV, get_sys_time from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit @@ -35,6 +36,34 @@ logger: logging.Logger = logging.getLogger(__name__) +def make_sqlalchemy_uri( + scheme: str, + username: Optional[str], + password: Optional[str], + at: Optional[str], + db: Optional[str], + uri_opts: Optional[Dict[str, Any]] = None, +) -> str: + url = f"{scheme}://" + if username is not None: + url += f"{quote_plus(username)}" + if password is not None: + url += f":{quote_plus(password)}" + url += "@" + if at is not None: + url += f"{at}" + if db is not None: + url += f"/{db}" + if uri_opts is not None: + if db is None: + url += "/" + params = "&".join( + f"{key}={quote_plus(value)}" for (key, value) in uri_opts.items() if value + ) + url = f"{url}?{params}" + return url + + @dataclass class SQLSourceReport(SourceReport): tables_scanned: int = 0 @@ -57,7 +86,7 @@ def report_dropped(self, ent_name: str) -> None: class SQLAlchemyConfig(ConfigModel): - env: str = "PROD" + env: str = DEFAULT_ENV options: dict = {} # Although the 'table_pattern' enables you to skip everything from certain schemas, # having another option to allow/deny on schema level is an optimization for the case when there is a large number @@ -92,17 +121,15 @@ class BasicSQLAlchemyConfig(SQLAlchemyConfig): database: Optional[str] = None scheme: str - def get_sql_alchemy_url(self): - url = f"{self.scheme}://" - if self.username: - url += f"{self.username}" - if self.password: - url += f":{self.password}" - url += "@" - url += f"{self.host_port}" - if self.database: - url += f"/{self.database}" - return url + def get_sql_alchemy_url(self, uri_opts=None): + return make_sqlalchemy_uri( + self.scheme, + self.username, + self.password, + self.host_port, + self.database, + uri_opts=uri_opts, + ) @dataclass diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 7768fa2f1a7a37..5d26718db7bd36 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -6,6 +6,7 @@ import requests from datahub.configuration.common import ConfigModel +from datahub.emitter.mce_builder import DEFAULT_ENV from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit @@ -74,8 +75,6 @@ def get_platform_from_sqlalchemy_uri(sqlalchemy_uri: str) -> str: "box_plot": ChartTypeClass.BAR, } -DEFAULT_ENV = "PROD" - class SupersetConfig(ConfigModel): # See the Superset /security/login endpoint for details From 5385b765483603cd3242dc0fde6c183f848ae571 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 28 Jun 2021 17:45:21 -0700 Subject: [PATCH 2/4] fix import --- .../src/datahub/ingestion/source/snowflake_usage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake_usage.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake_usage.py index c764e53f9deb98..629d38a8c729f0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake_usage.py @@ -13,7 +13,7 @@ import datahub.emitter.mce_builder as builder from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import UsageStatsWorkUnit -from datahub.ingestion.source.snowflake import BaseSnowflakeConfig, SnowflakeConfig +from datahub.ingestion.source.snowflake import BaseSnowflakeConfig from datahub.ingestion.source.usage_common import ( BaseUsageConfig, GenericAggregatedDataset, From 608cf31e88cf1ebc0c48a017e6cb37c5584188c3 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 29 Jun 2021 09:19:09 -0700 Subject: [PATCH 3/4] fix import paths --- metadata-ingestion/src/datahub/ingestion/source/snowflake.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake.py index 3cd5244509be66..0090828da46f17 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake.py @@ -1,13 +1,11 @@ import logging from typing import Optional -import pydantic - # This import verifies that the dependencies are available. import snowflake.sqlalchemy # noqa: F401 from snowflake.sqlalchemy import custom_types -from datahub.configuration.common import AllowDenyPattern, ConfigModel +from datahub.configuration.common import ConfigModel from .sql_common import ( SQLAlchemyConfig, From 2478bc21bd25ce242b82c552ffaecb0917d532c0 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 29 Jun 2021 23:11:12 -0700 Subject: [PATCH 4/4] fix diff --- metadata-ingestion/src/datahub/ingestion/source/glue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/glue.py b/metadata-ingestion/src/datahub/ingestion/source/glue.py index 8e152aa03ab259..e22bcfd8a19295 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/glue.py @@ -46,6 +46,7 @@ class GlueSourceConfig(AwsSourceConfig): + extract_transforms: Optional[bool] = True @property