Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ We have two options for the underlying library used to connect to SQL Server: (1

Extracts:

- List of databases, schema, and tables
- Column types associated with each table
- List of databases, schema, tables and views
- Column types associated with each table/view

```yml
source:
Expand All @@ -205,6 +205,7 @@ source:
password: pass
host_port: localhost:1433
database: DemoDatabase
include_views: True
table_pattern:
deny:
- "^.*\\.sys_.*" # deny all tables that start with sys_
Expand Down
156 changes: 111 additions & 45 deletions metadata-ingestion/src/datahub/ingestion/source/sql_common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import time
import warnings
from abc import abstractmethod
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Type
Expand Down Expand Up @@ -38,13 +39,26 @@
@dataclass
class SQLSourceReport(SourceReport):
tables_scanned: int = 0
views_scanned: int = 0
filtered: List[str] = field(default_factory=list)

def report_table_scanned(self, table_name: str) -> None:
warnings.warn("report_table_scanned is deprecated, please use report_entity_scanned with argument `table`")
self.tables_scanned += 1

def report_dropped(self, table_name: str) -> None:
self.filtered.append(table_name)
def report_entity_scanned(self, name: str, ent_type: str = "table") -> None:
"""
Entity could be a view or a table
"""
if ent_type == "table":
self.tables_scanned += 1
elif ent_type == "view":
self.views_scanned += 1
else:
raise KeyError(f"Unknown entity {ent_type}.")

def report_dropped(self, ent_name: str) -> None:
self.filtered.append(ent_name)


class SQLAlchemyConfig(ConfigModel):
Expand All @@ -56,6 +70,10 @@ class SQLAlchemyConfig(ConfigModel):
# them out afterwards via the table_pattern.
schema_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
view_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()

include_views: Optional[bool] = False
include_tables: Optional[bool] = True

@abstractmethod
def get_sql_alchemy_url(self):
Expand Down Expand Up @@ -213,50 +231,98 @@ def get_workunits(self) -> Iterable[SqlWorkUnit]:
self.report.report_dropped(schema)
continue

for table in inspector.get_table_names(schema):
schema, table = sql_config.standardize_schema_table_names(schema, table)
dataset_name = sql_config.get_identifier(schema, table)
self.report.report_table_scanned(dataset_name)

if not sql_config.table_pattern.allowed(dataset_name):
self.report.report_dropped(dataset_name)
continue

columns = inspector.get_columns(table, schema)
try:
table_info: dict = inspector.get_table_comment(table, schema)
except NotImplementedError:
description: Optional[str] = None
properties: Dict[str, str] = {}
else:
description = table_info["text"]

# The "properties" field is a non-standard addition to SQLAlchemy's interface.
properties = table_info.get("properties", {})

# TODO: capture inspector.get_pk_constraint
# TODO: capture inspector.get_sorted_table_and_fkc_names

dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})",
aspects=[],
)
if description is not None or properties:
dataset_properties = DatasetPropertiesClass(
description=description,
customProperties=properties,
# uri=dataset_name,
if sql_config.include_tables:
for table in inspector.get_table_names(schema):
schema, table = sql_config.standardize_schema_table_names(schema, table)
dataset_name = sql_config.get_identifier(schema, table)
self.report.report_entity_scanned(dataset_name, ent_type="table")

if not sql_config.table_pattern.allowed(dataset_name):
self.report.report_dropped(dataset_name)
continue

columns = inspector.get_columns(table, schema)
try:
table_info: dict = inspector.get_table_comment(table, schema)
except NotImplementedError:
description: Optional[str] = None
properties: Dict[str, str] = {}
else:
description = table_info["text"]

# The "properties" field is a non-standard addition to SQLAlchemy's interface.
properties = table_info.get("properties", {})

# TODO: capture inspector.get_pk_constraint
# TODO: capture inspector.get_sorted_table_and_fkc_names

dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})",
aspects=[],
)
if description is not None or properties:
dataset_properties = DatasetPropertiesClass(
description=description,
customProperties=properties,
# uri=dataset_name,
)
dataset_snapshot.aspects.append(dataset_properties)
schema_metadata = get_schema_metadata(
self.report, dataset_name, self.platform, columns
)
dataset_snapshot.aspects.append(dataset_properties)
schema_metadata = get_schema_metadata(
self.report, dataset_name, self.platform, columns
)
dataset_snapshot.aspects.append(schema_metadata)

mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
wu = SqlWorkUnit(id=dataset_name, mce=mce)
self.report.report_workunit(wu)
yield wu
dataset_snapshot.aspects.append(schema_metadata)

mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
wu = SqlWorkUnit(id=dataset_name, mce=mce)
self.report.report_workunit(wu)
yield wu

if sql_config.include_views:
for view in inspector.get_view_names(schema):
# TODO : change "standardize_schema_table_names" function name: it will be the same for tables and views
schema, view = sql_config.standardize_schema_table_names(schema, view)
dataset_name = sql_config.get_identifier(schema, view)
self.report.report_entity_scanned(dataset_name, ent_type="view")

if not sql_config.view_pattern.allowed(dataset_name):
self.report.report_dropped(dataset_name)
continue

columns = inspector.get_columns(view, schema)
try:
view_info: dict = inspector.get_table_comment(view, schema)
except NotImplementedError:
description: Optional[str] = None
properties: Dict[str, str] = {}
else:
description = view_info["text"]

# The "properties" field is a non-standard addition to SQLAlchemy's interface.
properties = view_info.get("properties", {})

# TODO: capture inspector.get_pk_constraint
# TODO: capture inspector.get_sorted_table_and_fkc_names

dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})",
aspects=[],
)
if description is not None or properties:
dataset_properties = DatasetPropertiesClass(
description=description,
customProperties=properties,
# uri=dataset_name,
)
dataset_snapshot.aspects.append(dataset_properties)
schema_metadata = get_schema_metadata(
self.report, dataset_name, self.platform, columns
)
dataset_snapshot.aspects.append(schema_metadata)

mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
wu = SqlWorkUnit(id=dataset_name, mce=mce)
self.report.report_workunit(wu)
yield wu

def get_report(self):
return self.report
Expand Down