From 8163c7b922a2ec9004618c7e62b2fe44825bf34f Mon Sep 17 00:00:00 2001 From: Vijayan Nallasami Date: Tue, 15 Jun 2021 10:23:07 +0100 Subject: [PATCH 1/3] removed key not in catalog filter and added node filter using AllowDenyPattern --- .../src/datahub/ingestion/source/dbt.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt.py b/metadata-ingestion/src/datahub/ingestion/source/dbt.py index 368833e95323e3..90047a99c13c70 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt.py @@ -28,6 +28,7 @@ StringTypeClass, ) from datahub.metadata.schema_classes import DatasetPropertiesClass +from datahub.configuration.common import AllowDenyPattern logger = logging.getLogger(__name__) @@ -38,6 +39,7 @@ class DBTConfig(ConfigModel): env: str = "PROD" target_platform: str load_schemas: bool + node_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() class DBTColumn: @@ -91,24 +93,25 @@ def extract_dbt_entities( load_catalog: bool, target_platform: str, environment: str, + node_pattern: AllowDenyPattern, ) -> List[DBTNode]: dbt_entities = [] for key in nodes: node = nodes[key] dbtNode = DBTNode() - if key not in catalog and load_catalog is False: + # check if node pattern allowed based on config file + if not node_pattern.allowed(node["resource_type"]): continue - - if "identifier" in node and load_catalog is False: - dbtNode.name = node["identifier"] - else: - dbtNode.name = node["name"] dbtNode.dbt_name = key dbtNode.database = node["database"] dbtNode.schema = node["schema"] dbtNode.dbt_file_path = node["original_file_path"] dbtNode.node_type = node["resource_type"] + if "identifier" in node and load_catalog is False: + dbtNode.name = node["identifier"] + else: + dbtNode.name = node["name"] if "materialized" in node["config"].keys(): # It's a model @@ -154,6 +157,7 @@ def loadManifestAndCatalog( load_catalog: bool, target_platform: str, environment: str, + node_pattern: AllowDenyPattern, ) -> List[DBTNode]: with open(manifest_path, "r") as manifest: with open(catalog_path, "r") as catalog: @@ -176,6 +180,7 @@ def loadManifestAndCatalog( load_catalog, target_platform, environment, + node_pattern, ) return nodes @@ -339,6 +344,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: self.config.load_schemas, self.config.target_platform, self.config.env, + self.config.node_pattern, ) for node in nodes: From e838fce95f67e609681f61f63bf66802a5f1f735 Mon Sep 17 00:00:00 2001 From: Vijayan Nallasami Date: Wed, 16 Jun 2021 08:54:20 +0100 Subject: [PATCH 2/3] renamed node_pattern as node_type_pattern --- .../src/datahub/ingestion/source/dbt.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt.py b/metadata-ingestion/src/datahub/ingestion/source/dbt.py index 90047a99c13c70..c7a98315c06f40 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt.py @@ -39,7 +39,7 @@ class DBTConfig(ConfigModel): env: str = "PROD" target_platform: str load_schemas: bool - node_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() + node_type_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() class DBTColumn: @@ -93,7 +93,7 @@ def extract_dbt_entities( load_catalog: bool, target_platform: str, environment: str, - node_pattern: AllowDenyPattern, + node_type_pattern: AllowDenyPattern, ) -> List[DBTNode]: dbt_entities = [] for key in nodes: @@ -101,7 +101,7 @@ def extract_dbt_entities( dbtNode = DBTNode() # check if node pattern allowed based on config file - if not node_pattern.allowed(node["resource_type"]): + if not node_type_pattern.allowed(node["resource_type"]): continue dbtNode.dbt_name = key dbtNode.database = node["database"] @@ -157,7 +157,7 @@ def loadManifestAndCatalog( load_catalog: bool, target_platform: str, environment: str, - node_pattern: AllowDenyPattern, + node_type_pattern: AllowDenyPattern, ) -> List[DBTNode]: with open(manifest_path, "r") as manifest: with open(catalog_path, "r") as catalog: @@ -180,7 +180,7 @@ def loadManifestAndCatalog( load_catalog, target_platform, environment, - node_pattern, + node_type_pattern, ) return nodes @@ -344,7 +344,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: self.config.load_schemas, self.config.target_platform, self.config.env, - self.config.node_pattern, + self.config.node_type_pattern, ) for node in nodes: From a12912ba7aa1b69103d42bcd55334db32639044f Mon Sep 17 00:00:00 2001 From: Vijayan Nallasami Date: Thu, 17 Jun 2021 10:04:03 +0100 Subject: [PATCH 3/3] fixed checkstyle issue --- metadata-ingestion/src/datahub/ingestion/source/dbt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt.py b/metadata-ingestion/src/datahub/ingestion/source/dbt.py index c7a98315c06f40..97199417e45ce9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt.py @@ -5,6 +5,7 @@ from typing import Any, Dict, Iterable, List from datahub.configuration import ConfigModel +from datahub.configuration.common import AllowDenyPattern from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.source.metadata_common import MetadataWorkUnit @@ -28,7 +29,6 @@ StringTypeClass, ) from datahub.metadata.schema_classes import DatasetPropertiesClass -from datahub.configuration.common import AllowDenyPattern logger = logging.getLogger(__name__)