diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt.py b/metadata-ingestion/src/datahub/ingestion/source/dbt.py index 697bb614a7d6c..be239b0880fdd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt.py @@ -1,12 +1,12 @@ import json import logging -import time from typing import Any, Dict, Iterable, List, Optional import dateutil.parser from datahub.configuration import ConfigModel from datahub.configuration.common import AllowDenyPattern +from datahub.emitter.mce_builder import get_sys_time from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.source.dbt_types import ( @@ -260,7 +260,8 @@ def get_upstreams( def get_upstream_lineage(upstream_urns: List[str]) -> UpstreamLineage: ucl: List[UpstreamClass] = [] - actor, sys_time = "urn:li:corpuser:dbt_executor", int(time.time()) * 1000 + actor = "urn:li:corpuser:dbt_executor" + sys_time = get_sys_time() for dep in upstream_urns: uc = UpstreamClass( @@ -329,7 +330,8 @@ def get_schema_metadata( canonical_schema.append(field) - actor, sys_time = "urn:li:corpuser:dbt_executor", int(time.time() * 1000) + actor = "urn:li:corpuser:dbt_executor" + sys_time = get_sys_time() last_modified = sys_time diff --git a/metadata-ingestion/src/datahub/ingestion/source/glue.py b/metadata-ingestion/src/datahub/ingestion/source/glue.py index 1692760ac327f..3cb844cf6baf9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/glue.py @@ -1,5 +1,4 @@ import json -import time import typing from collections import defaultdict from dataclasses import dataclass @@ -279,7 +278,7 @@ def process_dataflow_node( OwnershipClass( owners=[], lastModified=AuditStampClass( - time=int(time.time() * 1000), + time=mce_builder.get_sys_time(), actor="urn:li:corpuser:datahub", ), ) @@ -596,7 +595,7 @@ def get_schema_metadata(glue_source: GlueSource) -> SchemaMetadata: platformSchema=MySqlDDL(tableSchema=""), ) - sys_time = int(time.time() * 1000) + sys_time = mce_builder.get_sys_time() dataset_snapshot = DatasetSnapshot( urn=f"urn:li:dataset:(urn:li:dataPlatform:glue,{table_name},{self.env})", aspects=[], diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index d61ebe154201b..51338843d2a14 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -1,5 +1,4 @@ import logging -import time from dataclasses import dataclass, field from typing import Iterable, List @@ -10,6 +9,7 @@ from datahub.configuration import ConfigModel from datahub.configuration.common import AllowDenyPattern from datahub.configuration.kafka import KafkaConsumerConnectionConfig +from datahub.emitter.mce_builder import get_sys_time from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.source.metadata_common import MetadataWorkUnit @@ -87,7 +87,8 @@ def _extract_record(self, topic: str) -> MetadataChangeEvent: logger.debug(f"topic = {topic}") platform = "kafka" dataset_name = topic - actor, sys_time = "urn:li:corpuser:etl", int(time.time() * 1000) + actor = "urn:li:corpuser:etl" + sys_time = get_sys_time() dataset_snapshot = DatasetSnapshot( urn=f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{self.source_config.env})", diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py index 8bd26b617842a..aaf62a487cd45 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py @@ -1,6 +1,5 @@ import logging import re -import time from dataclasses import dataclass, field from typing import Dict, Iterable, List, Optional @@ -347,7 +346,7 @@ def construct_lineage_workunits( ), type=models.DatasetLineageTypeClass.TRANSFORMED, auditStamp=models.AuditStampClass( - time=int(time.time() * 1000), + time=builder.get_sys_time(), actor="urn:li:corpuser:datahub", ), ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker.py b/metadata-ingestion/src/datahub/ingestion/source/looker.py index e3de5b38cd123..da36dd46652ac 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker.py @@ -2,7 +2,6 @@ import json import logging import os -import time from dataclasses import dataclass from dataclasses import field as dataclass_field from typing import Any, Iterable, List, MutableMapping, Optional, Sequence @@ -18,6 +17,7 @@ from datahub.configuration import ConfigModel from datahub.configuration.common import AllowDenyPattern +from datahub.emitter.mce_builder import get_sys_time from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.source.metadata_common import MetadataWorkUnit @@ -304,7 +304,7 @@ def _make_chart_mce( self, dashboard_element: LookerDashboardElement ) -> MetadataChangeEvent: actor = self.source_config.actor - sys_time = int(time.time()) * 1000 + sys_time = get_sys_time() chart_urn = f"urn:li:chart:({self.source_config.platform_name},{dashboard_element.get_urn_element_id()})" chart_snapshot = ChartSnapshot( urn=chart_urn, @@ -338,7 +338,7 @@ def _make_dashboard_and_chart_mces( self, looker_dashboard: LookerDashboard ) -> List[MetadataChangeEvent]: actor = self.source_config.actor - sys_time = int(time.time()) * 1000 + sys_time = get_sys_time() chart_mces = [ self._make_chart_mce(element) diff --git a/metadata-ingestion/src/datahub/ingestion/source/lookml.py b/metadata-ingestion/src/datahub/ingestion/source/lookml.py index f270c1aa82132..823d8fc670bbf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/lookml.py +++ b/metadata-ingestion/src/datahub/ingestion/source/lookml.py @@ -2,7 +2,6 @@ import logging import re import sys -import time from dataclasses import dataclass from dataclasses import field as dataclass_field from dataclasses import replace @@ -18,6 +17,7 @@ from datahub.configuration import ConfigModel from datahub.configuration.common import AllowDenyPattern +from datahub.emitter.mce_builder import get_sys_time from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.source.metadata_common import MetadataWorkUnit @@ -505,7 +505,7 @@ def _build_dataset_mce(self, looker_view: LookerView) -> MetadataChangeEvent: dataset_name = looker_view.view_name actor = self.source_config.actor - sys_time = int(time.time()) * 1000 + sys_time = get_sys_time() dataset_snapshot = DatasetSnapshot( urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.source_config.platform_name},{dataset_name},{self.source_config.env})", diff --git a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py index b64e744a13a85..dda130e7c70da 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py @@ -1,4 +1,3 @@ -import time from collections import Counter from dataclasses import dataclass, field from typing import Any @@ -12,6 +11,7 @@ from pymongo.mongo_client import MongoClient from datahub.configuration.common import AllowDenyPattern, ConfigModel +from datahub.emitter.mce_builder import get_sys_time from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.source.metadata_common import MetadataWorkUnit @@ -468,7 +468,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: # create schema metadata object for collection actor = "urn:li:corpuser:etl" - sys_time = int(time.time() * 1000) + sys_time = get_sys_time() schema_metadata = SchemaMetadata( schemaName=collection_name, platform=f"urn:li:dataPlatform:{platform}", diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql_common.py index 565244ccfbe87..1a433960599b9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql_common.py @@ -1,5 +1,4 @@ import logging -import time import warnings from abc import abstractmethod from dataclasses import dataclass, field @@ -9,6 +8,7 @@ from sqlalchemy.sql import sqltypes as types from datahub.configuration.common import AllowDenyPattern, ConfigModel +from datahub.emitter.mce_builder import get_sys_time from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.source.metadata_common import MetadataWorkUnit @@ -194,7 +194,7 @@ def get_schema_metadata( canonical_schema.append(field) actor = "urn:li:corpuser:etl" - sys_time = int(time.time() * 1000) + sys_time = get_sys_time() schema_metadata = SchemaMetadata( schemaName=dataset_name, platform=f"urn:li:dataPlatform:{platform}", diff --git a/metadata-ingestion/tests/test_helpers/mce_helpers.py b/metadata-ingestion/tests/test_helpers/mce_helpers.py index e79415b94e412..c30fd6a04201e 100644 --- a/metadata-ingestion/tests/test_helpers/mce_helpers.py +++ b/metadata-ingestion/tests/test_helpers/mce_helpers.py @@ -18,9 +18,11 @@ def assert_mces_equal(output: object, golden: object) -> None: # Ignore timestamps from the ETL pipeline. A couple examples: # root[0]['proposedSnapshot']['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot']['aspects'][0]['com.linkedin.pegasus2avro.common.Ownership']['lastModified']['time'] # root[69]['proposedSnapshot']['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot']['aspects'][0]['com.linkedin.pegasus2avro.schema.SchemaMetadata']['lastModified']['time']" + # root[0]['proposedSnapshot']['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot']['aspects'][1]['com.linkedin.pegasus2avro.dataset.UpstreamLineage']['upstreams'][0]['auditStamp']['time'] r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['created'\]\['time'\]", r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['lastModified'\]\['time'\]", r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['createStamp'\]\['time'\]", + r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['auditStamp'\]\['time'\]", } diff = deepdiff.DeepDiff(golden, output, exclude_regex_paths=ignore_paths) assert not diff, str(diff)