Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
removed key not in catalog filter and added node filter using AllowDe…
…nyPattern
  • Loading branch information
vijayan-nallasami-curve committed Jun 15, 2021
commit 8163c7b922a2ec9004618c7e62b2fe44825bf34f
18 changes: 12 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
StringTypeClass,
)
from datahub.metadata.schema_classes import DatasetPropertiesClass
from datahub.configuration.common import AllowDenyPattern

logger = logging.getLogger(__name__)

Expand All @@ -38,6 +39,7 @@ class DBTConfig(ConfigModel):
env: str = "PROD"
target_platform: str
load_schemas: bool
node_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()


class DBTColumn:
Expand Down Expand Up @@ -91,24 +93,25 @@ def extract_dbt_entities(
load_catalog: bool,
target_platform: str,
environment: str,
node_pattern: AllowDenyPattern,
) -> List[DBTNode]:
dbt_entities = []
for key in nodes:
node = nodes[key]
dbtNode = DBTNode()

if key not in catalog and load_catalog is False:
# check if node pattern allowed based on config file
if not node_pattern.allowed(node["resource_type"]):
continue

if "identifier" in node and load_catalog is False:
dbtNode.name = node["identifier"]
else:
dbtNode.name = node["name"]
dbtNode.dbt_name = key
dbtNode.database = node["database"]
dbtNode.schema = node["schema"]
dbtNode.dbt_file_path = node["original_file_path"]
dbtNode.node_type = node["resource_type"]
if "identifier" in node and load_catalog is False:
dbtNode.name = node["identifier"]
else:
dbtNode.name = node["name"]

if "materialized" in node["config"].keys():
# It's a model
Expand Down Expand Up @@ -154,6 +157,7 @@ def loadManifestAndCatalog(
load_catalog: bool,
target_platform: str,
environment: str,
node_pattern: AllowDenyPattern,
) -> List[DBTNode]:
with open(manifest_path, "r") as manifest:
with open(catalog_path, "r") as catalog:
Expand All @@ -176,6 +180,7 @@ def loadManifestAndCatalog(
load_catalog,
target_platform,
environment,
node_pattern,
)

return nodes
Expand Down Expand Up @@ -339,6 +344,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
self.config.load_schemas,
self.config.target_platform,
self.config.env,
self.config.node_pattern,
)

for node in nodes:
Expand Down