Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
refactor(sql_common): improved code
- corrected deprecated SQLAlchemy inspector method
- Broke down get_workunits function (code quality)
- added view properties
  • Loading branch information
vlavorini committed Jun 9, 2021
commit 9805733d1988950d4351cbb6ae5ebb932d5f7b16
209 changes: 114 additions & 95 deletions metadata-ingestion/src/datahub/ingestion/source/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Type

from sqlalchemy import create_engine
from sqlalchemy.engine import reflection
from sqlalchemy import create_engine, inspect
from sqlalchemy.sql import sqltypes as types

from datahub.configuration.common import AllowDenyPattern, ConfigModel
Expand Down Expand Up @@ -43,7 +42,9 @@ class SQLSourceReport(SourceReport):
filtered: List[str] = field(default_factory=list)

def report_table_scanned(self, table_name: str) -> None:
warnings.warn("report_table_scanned is deprecated, please use report_entity_scanned with argument `table`")
warnings.warn(
"report_table_scanned is deprecated, please use report_entity_scanned with argument `table`"
)
self.tables_scanned += 1

def report_entity_scanned(self, name: str, ent_type: str = "table") -> None:
Expand Down Expand Up @@ -83,11 +84,11 @@ def get_identifier(self, schema: str, table: str) -> str:
return f"{schema}.{table}"

def standardize_schema_table_names(
self, schema: str, table: str
self, schema: str, entity: str
) -> Tuple[str, str]:
# Some SQLAlchemy dialects need a standardization step to clean the schema
# and table names. See BigQuery for an example of when this is useful.
return schema, table
return schema, entity


class BasicSQLAlchemyConfig(SQLAlchemyConfig):
Expand Down Expand Up @@ -225,104 +226,122 @@ def get_workunits(self) -> Iterable[SqlWorkUnit]:
url = sql_config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **sql_config.options)
inspector = reflection.Inspector.from_engine(engine)
inspector = inspect(engine)
for schema in inspector.get_schema_names():
if not sql_config.schema_pattern.allowed(schema):
self.report.report_dropped(schema)
continue

if sql_config.include_tables:
for table in inspector.get_table_names(schema):
schema, table = sql_config.standardize_schema_table_names(schema, table)
dataset_name = sql_config.get_identifier(schema, table)
self.report.report_entity_scanned(dataset_name, ent_type="table")

if not sql_config.table_pattern.allowed(dataset_name):
self.report.report_dropped(dataset_name)
continue

columns = inspector.get_columns(table, schema)
try:
table_info: dict = inspector.get_table_comment(table, schema)
except NotImplementedError:
description: Optional[str] = None
properties: Dict[str, str] = {}
else:
description = table_info["text"]

# The "properties" field is a non-standard addition to SQLAlchemy's interface.
properties = table_info.get("properties", {})

# TODO: capture inspector.get_pk_constraint
# TODO: capture inspector.get_sorted_table_and_fkc_names

dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})",
aspects=[],
)
if description is not None or properties:
dataset_properties = DatasetPropertiesClass(
description=description,
customProperties=properties,
# uri=dataset_name,
)
dataset_snapshot.aspects.append(dataset_properties)
schema_metadata = get_schema_metadata(
self.report, dataset_name, self.platform, columns
)
dataset_snapshot.aspects.append(schema_metadata)

mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
wu = SqlWorkUnit(id=dataset_name, mce=mce)
self.report.report_workunit(wu)
yield wu
yield from self.loop_tables(inspector, schema, sql_config)

if sql_config.include_views:
for view in inspector.get_view_names(schema):
# TODO : change "standardize_schema_table_names" function name: it will be the same for tables and views
schema, view = sql_config.standardize_schema_table_names(schema, view)
dataset_name = sql_config.get_identifier(schema, view)
self.report.report_entity_scanned(dataset_name, ent_type="view")

if not sql_config.view_pattern.allowed(dataset_name):
self.report.report_dropped(dataset_name)
continue

columns = inspector.get_columns(view, schema)
try:
view_info: dict = inspector.get_table_comment(view, schema)
except NotImplementedError:
description: Optional[str] = None
properties: Dict[str, str] = {}
else:
description = view_info["text"]

# The "properties" field is a non-standard addition to SQLAlchemy's interface.
properties = view_info.get("properties", {})

# TODO: capture inspector.get_pk_constraint
# TODO: capture inspector.get_sorted_table_and_fkc_names

dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})",
aspects=[],
)
if description is not None or properties:
dataset_properties = DatasetPropertiesClass(
description=description,
customProperties=properties,
# uri=dataset_name,
)
dataset_snapshot.aspects.append(dataset_properties)
schema_metadata = get_schema_metadata(
self.report, dataset_name, self.platform, columns
)
dataset_snapshot.aspects.append(schema_metadata)

mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
wu = SqlWorkUnit(id=dataset_name, mce=mce)
self.report.report_workunit(wu)
yield wu
yield from self.loop_views(inspector, schema, sql_config)

def loop_tables(
self,
inspector: Any,
schema: str,
sql_config: SQLAlchemyConfig,
) -> Iterable[SqlWorkUnit]:
for table in inspector.get_table_names(schema):
schema, table = sql_config.standardize_schema_table_names(schema, table)
dataset_name = sql_config.get_identifier(schema, table)
self.report.report_entity_scanned(dataset_name, ent_type="table")

if not sql_config.table_pattern.allowed(dataset_name):
self.report.report_dropped(dataset_name)
continue

columns = inspector.get_columns(table, schema)
try:
table_info: dict = inspector.get_table_comment(table, schema)
except NotImplementedError:
description: Optional[str] = None
properties: Dict[str, str] = {}
else:
description = table_info["text"]

# The "properties" field is a non-standard addition to SQLAlchemy's interface.
properties = table_info.get("properties", {})

# TODO: capture inspector.get_pk_constraint
# TODO: capture inspector.get_sorted_table_and_fkc_names

dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})",
aspects=[],
)
if description is not None or properties:
dataset_properties = DatasetPropertiesClass(
description=description,
customProperties=properties,
# uri=dataset_name,
)
dataset_snapshot.aspects.append(dataset_properties)
schema_metadata = get_schema_metadata(
self.report, dataset_name, self.platform, columns
)
dataset_snapshot.aspects.append(schema_metadata)

mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
wu = SqlWorkUnit(id=dataset_name, mce=mce)
self.report.report_workunit(wu)
yield wu

def loop_views(
self,
inspector: Any,
schema: str,
sql_config: SQLAlchemyConfig,
) -> Iterable[SqlWorkUnit]:
for view in inspector.get_view_names(schema):
schema, view = sql_config.standardize_schema_table_names(schema, view)
dataset_name = sql_config.get_identifier(schema, view)
self.report.report_entity_scanned(dataset_name, ent_type="view")

if not sql_config.view_pattern.allowed(dataset_name):
self.report.report_dropped(dataset_name)
continue

columns = inspector.get_columns(view, schema)
try:
view_info: dict = inspector.get_table_comment(view, schema)
except NotImplementedError:
description: Optional[str] = None
properties: Dict[str, str] = {}
else:
description = view_info["text"]

# The "properties" field is a non-standard addition to SQLAlchemy's interface.
properties = view_info.get("properties", {})

view_definition = inspector.get_view_definition(view)
if view_definition is None:
view_definition = ""
properties["view_definition"] = view_definition
properties["is_view"] = "True"

dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})",
aspects=[],
)
if description is not None or properties:
dataset_properties = DatasetPropertiesClass(
description=description,
customProperties=properties,
# uri=dataset_name,
)
dataset_snapshot.aspects.append(dataset_properties)
schema_metadata = get_schema_metadata(
self.report, dataset_name, self.platform, columns
)
dataset_snapshot.aspects.append(schema_metadata)

mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
wu = SqlWorkUnit(id=dataset_name, mce=mce)
self.report.report_workunit(wu)
yield wu

def get_report(self):
return self.report
Expand Down