Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
sqlalchemy uri refactoring
  • Loading branch information
hsheth2 committed Jun 30, 2021
commit 0fa27b591ae4a0b1bb1d935babb2850497703a9e
28 changes: 12 additions & 16 deletions metadata-ingestion/src/datahub/ingestion/source/athena.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Optional
from urllib.parse import quote_plus

from .sql_common import SQLAlchemyConfig, SQLAlchemySource
from .sql_common import SQLAlchemyConfig, SQLAlchemySource, make_sqlalchemy_uri


class AthenaConfig(SQLAlchemyConfig):
Expand All @@ -14,20 +13,17 @@ class AthenaConfig(SQLAlchemyConfig):
work_group: str

def get_sql_alchemy_url(self):
url = f"{self.scheme}://"
if self.username:
url += f"{quote_plus(self.username)}"
if self.password:
url += f":{quote_plus(self.password)}"
else:
url += ":"
url += f"@athena.{self.aws_region}.amazonaws.com:443/"
if self.database:
url += f"{self.database}"
url += f"?s3_staging_dir={quote_plus(self.s3_staging_dir)}"
url += f"&work_group={self.work_group}"

return url
return make_sqlalchemy_uri(
self.scheme,
self.username or "",
self.password,
f"athena.{self.aws_region}.amazonaws.com:443",
self.database,
uri_opts={
"s3_staging_dir": self.s3_staging_dir,
"work_group": self.work_group,
},
)


class AthenaSource(SQLAlchemySource):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.emitter.mce_builder import DEFAULT_ENV


def assume_role(
Expand Down Expand Up @@ -34,7 +35,7 @@ class AwsSourceConfig(ConfigModel):
- SageMaker source
"""

env: str = "PROD"
env: str = DEFAULT_ENV

database_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.emitter.mce_builder import get_sys_time
from datahub.emitter.mce_builder import DEFAULT_ENV, get_sys_time
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand Down Expand Up @@ -44,7 +44,7 @@ class DBTConfig(ConfigModel):
manifest_path: str
catalog_path: str
sources_path: Optional[str]
env: str = "PROD"
env: str = DEFAULT_ENV
target_platform: str
load_schemas: bool
node_type_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
Expand Down
3 changes: 1 addition & 2 deletions metadata-ingestion/src/datahub/ingestion/source/druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ class DruidConfig(BasicSQLAlchemyConfig):
schema_pattern: AllowDenyPattern = AllowDenyPattern(deny=["^(lookup|sys).*"])

def get_sql_alchemy_url(self):
return f"{BasicSQLAlchemyConfig.get_sql_alchemy_url(self)}/druid/v2/sql/"
return f"{super().get_sql_alchemy_url(self)}/druid/v2/sql/"

"""
The pydruid library already formats the table name correctly, so we do not
need to use the schema name when constructing the URN. Without this override,
every URN would incorrectly start with "druid.

For more information, see https://druid.apache.org/docs/latest/querying/sql.html#schemata-table

"""

def get_identifier(self, schema: str, table: str) -> str:
Expand Down
3 changes: 1 addition & 2 deletions metadata-ingestion/src/datahub/ingestion/source/feast.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand Down Expand Up @@ -45,8 +46,6 @@
"UNIX_TIMESTAMP_LIST": MLFeatureDataType.SEQUENCE,
}

DEFAULT_ENV = "PROD"

# image to use for initial feast extraction
HOSTED_FEAST_IMAGE = "acryldata/datahub-ingestion-feast-wrapper"

Expand Down
1 change: 0 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@


class GlueSourceConfig(AwsSourceConfig):

extract_transforms: Optional[bool] = True

@property
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.kafka import KafkaConsumerConnectionConfig
from datahub.emitter.mce_builder import get_sys_time
from datahub.emitter.mce_builder import DEFAULT_ENV, get_sys_time
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand All @@ -26,7 +26,7 @@


class KafkaSourceConfig(ConfigModel):
env: str = "PROD"
env: str = DEFAULT_ENV
# TODO: inline the connection config
connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig()
topic_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class KafkaConnectSourceConfig(ConfigModel):
username: Optional[str] = None
password: Optional[str] = None
cluster_name: Optional[str] = "connect-cluster"
env: str = "PROD"
env: str = builder.DEFAULT_ENV
connector_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"])


Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/looker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.emitter.mce_builder import get_sys_time
from datahub.emitter.mce_builder import DEFAULT_ENV, get_sys_time
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand Down Expand Up @@ -52,7 +52,7 @@ class LookerDashboardSourceConfig(ConfigModel):
actor: str = "urn:li:corpuser:etl"
dashboard_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
chart_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
env: str = "PROD"
env: str = DEFAULT_ENV


@dataclass
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/lookml.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.emitter.mce_builder import get_sys_time
from datahub.emitter.mce_builder import DEFAULT_ENV, get_sys_time
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand Down Expand Up @@ -57,7 +57,7 @@ class LookMLSourceConfig(ConfigModel): # pragma: no cover
actor: str = "urn:li:corpuser:etl"
model_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
view_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
env: str = "PROD"
env: str = DEFAULT_ENV
parse_table_names_from_sql: bool = False


Expand Down
4 changes: 1 addition & 3 deletions metadata-ingestion/src/datahub/ingestion/source/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pymongo.mongo_client import MongoClient

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.emitter.mce_builder import get_sys_time
from datahub.emitter.mce_builder import DEFAULT_ENV, get_sys_time
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand Down Expand Up @@ -41,8 +41,6 @@
# https://stackoverflow.com/a/48273736/5004662.
DENY_DATABASE_LIST = set(["admin", "config", "local"])

DEFAULT_ENV = "PROD"


class MongoDBConfig(ConfigModel):
# See the MongoDB authentication docs for details and examples.
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/source/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def get_sql_alchemy_url(self):
import pyodbc # noqa: F401

self.scheme = "mssql+pyodbc"
uri = super().get_sql_alchemy_url()

uri = super().get_sql_alchemy_url(uri_opts=None)
if self.use_odbc:
uri = f"{uri}?{urllib.parse.urlencode(self.uri_args)}"
return uri
Expand Down
48 changes: 36 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/source/snowflake.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,64 @@
import logging
from typing import Optional

import pydantic

# This import verifies that the dependencies are available.
import snowflake.sqlalchemy # noqa: F401
from snowflake.sqlalchemy import custom_types

from datahub.configuration.common import AllowDenyPattern, ConfigModel

from .sql_common import (
BasicSQLAlchemyConfig,
SQLAlchemyConfig,
SQLAlchemySource,
TimeTypeClass,
make_sqlalchemy_uri,
register_custom_type,
)

register_custom_type(custom_types.TIMESTAMP_TZ, TimeTypeClass)
register_custom_type(custom_types.TIMESTAMP_LTZ, TimeTypeClass)
register_custom_type(custom_types.TIMESTAMP_NTZ, TimeTypeClass)

logger: logging.Logger = logging.getLogger(__name__)


class SnowflakeConfig(BasicSQLAlchemyConfig):
class BaseSnowflakeConfig(ConfigModel):
# Note: this config model is also used by the snowflake-usage source.

scheme = "snowflake"

database: str # database is required
username: Optional[str] = None
password: Optional[str] = None
host_port: str
warehouse: Optional[str]
role: Optional[str]

def get_sql_alchemy_url(self, database=None):
return make_sqlalchemy_uri(
self.scheme,
self.username,
self.password,
self.host_port,
database,
uri_opts={
# Drop the options if value is None.
key: value
for (key, value) in {
"warehouse": self.warehouse,
"role": self.role,
}.items()
if value
},
)


class SnowflakeConfig(BaseSnowflakeConfig, SQLAlchemyConfig):
database: str

def get_sql_alchemy_url(self):
connect_string = super().get_sql_alchemy_url()
options = {
"warehouse": self.warehouse,
"role": self.role,
}
params = "&".join(f"{key}={value}" for (key, value) in options.items() if value)
if params:
connect_string = f"{connect_string}?{params}"
return connect_string
return super().get_sql_alchemy_url(self.database)

def get_identifier(self, schema: str, table: str) -> str:
regular = super().get_identifier(schema, table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import datahub.emitter.mce_builder as builder
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import UsageStatsWorkUnit
from datahub.ingestion.source.snowflake import SnowflakeConfig
from datahub.ingestion.source.snowflake import BaseSnowflakeConfig, SnowflakeConfig
from datahub.ingestion.source.usage_common import (
BaseUsageConfig,
GenericAggregatedDataset,
Expand Down Expand Up @@ -86,8 +86,9 @@ class SnowflakeJoinedAccessEvent:
role_name: str


class SnowflakeUsageConfig(SnowflakeConfig, BaseUsageConfig):
database: str = "snowflake"
class SnowflakeUsageConfig(BaseSnowflakeConfig, BaseUsageConfig):
env: str = builder.DEFAULT_ENV
options: dict = {}

@pydantic.validator("role", always=True)
def role_accountadmin(cls, v):
Expand All @@ -100,6 +101,9 @@ def role_accountadmin(cls, v):
)
return v

def get_sql_alchemy_url(self):
return super().get_sql_alchemy_url(database="snowflake")


@dataclasses.dataclass
class SnowflakeUsageSource(Source):
Expand Down
Loading