Skip to content

Commit a7fc76f

Browse files
authored
feat(sql_views): added views as datasets for SQLAlchemy DBs (#2663)
1 parent 1b53922 commit a7fc76f

File tree

2 files changed

+137
-51
lines changed

2 files changed

+137
-51
lines changed

metadata-ingestion/README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,8 @@ We have two options for the underlying library used to connect to SQL Server: (1
195195

196196
Extracts:
197197

198-
- List of databases, schema, and tables
199-
- Column types associated with each table
198+
- List of databases, schema, tables and views
199+
- Column types associated with each table/view
200200

201201
```yml
202202
source:
@@ -206,6 +206,7 @@ source:
206206
password: pass
207207
host_port: localhost:1433
208208
database: DemoDatabase
209+
include_views: True
209210
table_pattern:
210211
deny:
211212
- "^.*\\.sys_.*" # deny all tables that start with sys_

metadata-ingestion/src/datahub/ingestion/source/sql_common.py

Lines changed: 134 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import logging
22
import time
3+
import warnings
34
from abc import abstractmethod
45
from dataclasses import dataclass, field
56
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Type
67

7-
from sqlalchemy import create_engine
8-
from sqlalchemy.engine import reflection
8+
from sqlalchemy import create_engine, inspect
99
from sqlalchemy.sql import sqltypes as types
1010

1111
from datahub.configuration.common import AllowDenyPattern, ConfigModel
@@ -38,13 +38,28 @@
3838
@dataclass
3939
class SQLSourceReport(SourceReport):
4040
tables_scanned: int = 0
41+
views_scanned: int = 0
4142
filtered: List[str] = field(default_factory=list)
4243

4344
def report_table_scanned(self, table_name: str) -> None:
45+
warnings.warn(
46+
"report_table_scanned is deprecated, please use report_entity_scanned with argument `table`"
47+
)
4448
self.tables_scanned += 1
4549

46-
def report_dropped(self, table_name: str) -> None:
47-
self.filtered.append(table_name)
50+
def report_entity_scanned(self, name: str, ent_type: str = "table") -> None:
51+
"""
52+
Entity could be a view or a table
53+
"""
54+
if ent_type == "table":
55+
self.tables_scanned += 1
56+
elif ent_type == "view":
57+
self.views_scanned += 1
58+
else:
59+
raise KeyError(f"Unknown entity {ent_type}.")
60+
61+
def report_dropped(self, ent_name: str) -> None:
62+
self.filtered.append(ent_name)
4863

4964

5065
class SQLAlchemyConfig(ConfigModel):
@@ -56,6 +71,10 @@ class SQLAlchemyConfig(ConfigModel):
5671
# them out afterwards via the table_pattern.
5772
schema_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
5873
table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
74+
view_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
75+
76+
include_views: Optional[bool] = False
77+
include_tables: Optional[bool] = True
5978

6079
@abstractmethod
6180
def get_sql_alchemy_url(self):
@@ -65,11 +84,11 @@ def get_identifier(self, schema: str, table: str) -> str:
6584
return f"{schema}.{table}"
6685

6786
def standardize_schema_table_names(
68-
self, schema: str, table: str
87+
self, schema: str, entity: str
6988
) -> Tuple[str, str]:
7089
# Some SQLAlchemy dialects need a standardization step to clean the schema
7190
# and table names. See BigQuery for an example of when this is useful.
72-
return schema, table
91+
return schema, entity
7392

7493

7594
class BasicSQLAlchemyConfig(SQLAlchemyConfig):
@@ -207,56 +226,122 @@ def get_workunits(self) -> Iterable[SqlWorkUnit]:
207226
url = sql_config.get_sql_alchemy_url()
208227
logger.debug(f"sql_alchemy_url={url}")
209228
engine = create_engine(url, **sql_config.options)
210-
inspector = reflection.Inspector.from_engine(engine)
229+
inspector = inspect(engine)
211230
for schema in inspector.get_schema_names():
212231
if not sql_config.schema_pattern.allowed(schema):
213232
self.report.report_dropped(schema)
214233
continue
215234

216-
for table in inspector.get_table_names(schema):
217-
schema, table = sql_config.standardize_schema_table_names(schema, table)
218-
dataset_name = sql_config.get_identifier(schema, table)
219-
self.report.report_table_scanned(dataset_name)
220-
221-
if not sql_config.table_pattern.allowed(dataset_name):
222-
self.report.report_dropped(dataset_name)
223-
continue
224-
225-
columns = inspector.get_columns(table, schema)
226-
try:
227-
table_info: dict = inspector.get_table_comment(table, schema)
228-
except NotImplementedError:
229-
description: Optional[str] = None
230-
properties: Dict[str, str] = {}
231-
else:
232-
description = table_info["text"]
233-
234-
# The "properties" field is a non-standard addition to SQLAlchemy's interface.
235-
properties = table_info.get("properties", {})
236-
237-
# TODO: capture inspector.get_pk_constraint
238-
# TODO: capture inspector.get_sorted_table_and_fkc_names
239-
240-
dataset_snapshot = DatasetSnapshot(
241-
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})",
242-
aspects=[],
243-
)
244-
if description is not None or properties:
245-
dataset_properties = DatasetPropertiesClass(
246-
description=description,
247-
customProperties=properties,
248-
# uri=dataset_name,
249-
)
250-
dataset_snapshot.aspects.append(dataset_properties)
251-
schema_metadata = get_schema_metadata(
252-
self.report, dataset_name, self.platform, columns
235+
if sql_config.include_tables:
236+
yield from self.loop_tables(inspector, schema, sql_config)
237+
238+
if sql_config.include_views:
239+
yield from self.loop_views(inspector, schema, sql_config)
240+
241+
def loop_tables(
242+
self,
243+
inspector: Any,
244+
schema: str,
245+
sql_config: SQLAlchemyConfig,
246+
) -> Iterable[SqlWorkUnit]:
247+
for table in inspector.get_table_names(schema):
248+
schema, table = sql_config.standardize_schema_table_names(schema, table)
249+
dataset_name = sql_config.get_identifier(schema, table)
250+
self.report.report_entity_scanned(dataset_name, ent_type="table")
251+
252+
if not sql_config.table_pattern.allowed(dataset_name):
253+
self.report.report_dropped(dataset_name)
254+
continue
255+
256+
columns = inspector.get_columns(table, schema)
257+
try:
258+
table_info: dict = inspector.get_table_comment(table, schema)
259+
except NotImplementedError:
260+
description: Optional[str] = None
261+
properties: Dict[str, str] = {}
262+
else:
263+
description = table_info["text"]
264+
265+
# The "properties" field is a non-standard addition to SQLAlchemy's interface.
266+
properties = table_info.get("properties", {})
267+
268+
# TODO: capture inspector.get_pk_constraint
269+
# TODO: capture inspector.get_sorted_table_and_fkc_names
270+
271+
dataset_snapshot = DatasetSnapshot(
272+
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})",
273+
aspects=[],
274+
)
275+
if description is not None or properties:
276+
dataset_properties = DatasetPropertiesClass(
277+
description=description,
278+
customProperties=properties,
279+
# uri=dataset_name,
253280
)
254-
dataset_snapshot.aspects.append(schema_metadata)
281+
dataset_snapshot.aspects.append(dataset_properties)
282+
schema_metadata = get_schema_metadata(
283+
self.report, dataset_name, self.platform, columns
284+
)
285+
dataset_snapshot.aspects.append(schema_metadata)
286+
287+
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
288+
wu = SqlWorkUnit(id=dataset_name, mce=mce)
289+
self.report.report_workunit(wu)
290+
yield wu
291+
292+
def loop_views(
293+
self,
294+
inspector: Any,
295+
schema: str,
296+
sql_config: SQLAlchemyConfig,
297+
) -> Iterable[SqlWorkUnit]:
298+
for view in inspector.get_view_names(schema):
299+
schema, view = sql_config.standardize_schema_table_names(schema, view)
300+
dataset_name = sql_config.get_identifier(schema, view)
301+
self.report.report_entity_scanned(dataset_name, ent_type="view")
302+
303+
if not sql_config.view_pattern.allowed(dataset_name):
304+
self.report.report_dropped(dataset_name)
305+
continue
255306

256-
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
257-
wu = SqlWorkUnit(id=dataset_name, mce=mce)
258-
self.report.report_workunit(wu)
259-
yield wu
307+
columns = inspector.get_columns(view, schema)
308+
try:
309+
view_info: dict = inspector.get_table_comment(view, schema)
310+
except NotImplementedError:
311+
description: Optional[str] = None
312+
properties: Dict[str, str] = {}
313+
else:
314+
description = view_info["text"]
315+
316+
# The "properties" field is a non-standard addition to SQLAlchemy's interface.
317+
properties = view_info.get("properties", {})
318+
319+
view_definition = inspector.get_view_definition(view)
320+
if view_definition is None:
321+
view_definition = ""
322+
properties["view_definition"] = view_definition
323+
properties["is_view"] = "True"
324+
325+
dataset_snapshot = DatasetSnapshot(
326+
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})",
327+
aspects=[],
328+
)
329+
if description is not None or properties:
330+
dataset_properties = DatasetPropertiesClass(
331+
description=description,
332+
customProperties=properties,
333+
# uri=dataset_name,
334+
)
335+
dataset_snapshot.aspects.append(dataset_properties)
336+
schema_metadata = get_schema_metadata(
337+
self.report, dataset_name, self.platform, columns
338+
)
339+
dataset_snapshot.aspects.append(schema_metadata)
340+
341+
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
342+
wu = SqlWorkUnit(id=dataset_name, mce=mce)
343+
self.report.report_workunit(wu)
344+
yield wu
260345

261346
def get_report(self):
262347
return self.report

0 commit comments

Comments
 (0)