Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Added chart->dashboard, dataset->chart and job->dataflow lineage emi…
…ssion samples + updated docs
  • Loading branch information
rslanka committed Nov 19, 2021
commit de01e0796e23ff8e5d969edc07695af639d48f98
8 changes: 6 additions & 2 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,12 @@ In some cases, you might want to construct the MetadataChangeEvents yourself but
- [DataHub emitter via Kafka](./src/datahub/emitter/kafka_emitter.py) (same requirements as `datahub-kafka`).
### Sample code
#### Lineage
- [ineage_emitter_mcpw_rest.py](./examples/library/lineage_emitter_mcpw_rest.py) - emits simple bigquery table-to-table (dataset-to-dataset) lineage via REST as MetadataChangeProposalWrapper.
- [lineage_dataset_to_job_to_dataset.py](./examples/library/lineage_dataset_to_job_to_dataset.py) - emits mysql-to-airflow-to-kafka (dataset-to-job-to-dataset) lineage via REST as MetadataChangeProposalWrapper.
The following samples will cover emitting dataset-to-dataset, dataset-to-job-to-dataset, chart-to-dataset, dashboard-to-chart and job-to-dataflow lineages.
- [lineage_emitter_mcpw_rest.py](./examples/library/lineage_emitter_mcpw_rest.py) - emits simple bigquery table-to-table (dataset-to-dataset) lineage via REST as MetadataChangeProposalWrapper.
- [lineage_dataset_job_dataset.py](./examples/library/lineage_dataset_job_dataset.py) - emits mysql-to-airflow-to-kafka (dataset-to-job-to-dataset) lineage via REST as MetadataChangeProposalWrapper.
- [lineage_dataset_chart.py](./examples/library/lineage_dataset_chart.py) - emits the dataset-to-chart lineage via REST as MetadataChangeProposalWrapper.
- [lineage_chart_dashboard.py](./examples/library/lineage_chart_dashboard.py) - emits the chart-to-dashboard lineage via REST as MetadataChangeProposalWrapper.
- [lineage_job_dataflow.py](./examples/library/lineage_job_dataflow.py) - emits the job-to-dataflow lineage via REST as MetadataChangeProposalWrapper.
- [lineage_emitter_rest.py](./examples/library/lineage_emitter_rest.py) - emits simple dataset-to-dataset lineage via REST as MetadataChangeEvent.
- [lineage_emitter_kafka.py](./examples/library/lineage_emitter_kafka.py) - emits simple dataset-to-dataset lineage via Kafka as MetadataChangeEvent.
- [Datahub Snowflake Lineage](https://github.com/linkedin/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py#L249) - emits Datahub's Snowflake lineage as MetadataChangeProposalWrapper.
Expand Down
39 changes: 39 additions & 0 deletions metadata-ingestion/examples/library/lineage_chart_dashboard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from typing import List

import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dashboard import DashboardInfoClass
from datahub.metadata.schema_classes import ChangeAuditStampsClass, ChangeTypeClass

# Construct the DashboardInfo aspect with the charts -> dashboard lineage.
charts_in_dashboard: List[str] = [
builder.make_chart_urn(platform="looker", name="chart_1"),
builder.make_chart_urn(platform="looker", name="chart_2"),
]

last_modified = ChangeAuditStampsClass()


dashboard_info = DashboardInfoClass(
title="My Dashboard 1",
description="Sample dashboard",
lastModified=last_modified,
charts=charts_in_dashboard,
)

# Construct a MetadataChangeProposalWrapper object with the DashboardInfo aspect.
# NOTE: This will overwrite all of the existing dashboard aspect information associated with this dashboard.
chart_info_mcp = MetadataChangeProposalWrapper(
entityType="dashboard",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_dashboard_urn(platform="looker", name="my_dashboard_1"),
aspectName="dashboardInfo",
aspect=dashboard_info,
)

# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")

# Emit metadata!
emitter.emit_mcp(chart_info_mcp)
38 changes: 38 additions & 0 deletions metadata-ingestion/examples/library/lineage_dataset_chart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import List

import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.chart import ChartInfoClass
from datahub.metadata.schema_classes import ChangeAuditStampsClass, ChangeTypeClass

# Construct the ChartInfo aspect with the input_datasets lineage.
input_datasets: List[str] = [
builder.make_dataset_urn(platform="hdfs", name="dataset1", env="PROD"),
builder.make_dataset_urn(platform="hdfs", name="dataset2", env="PROD"),
]

last_modified = ChangeAuditStampsClass()

chart_info = ChartInfoClass(
title="Baz Chart 1",
description="Sample Baz chart",
lastModified=last_modified,
inputs=input_datasets,
)

# Construct a MetadataChangeProposalWrapper object with the ChartInfo aspect.
# NOTE: This will overwrite all of the existing chartInfo aspect information associated with this chart.
chart_info_mcp = MetadataChangeProposalWrapper(
entityType="chart",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_chart_urn(platform="looker", name="my_chart_1"),
aspectName="chartInfo",
aspect=chart_info,
)

# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")

# Emit metadata!
emitter.emit_mcp(chart_info_mcp)
30 changes: 30 additions & 0 deletions metadata-ingestion/examples/library/lineage_job_dataflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.datajob import DataJobInfoClass
from datahub.metadata.schema_classes import ChangeTypeClass

# Construct the DataJobInfo aspect with the job -> flow lineage.
dataflow_urn = builder.make_data_flow_urn(
orchestrator="airflow", flow_id="flow1", cluster="prod"
)

datajob_info = DataJobInfoClass(name="My Job 1", type="AIRFLOW", flowUrn=dataflow_urn)

# Construct a MetadataChangeProposalWrapper object with the DataJobInfo aspect.
# NOTE: This will overwrite all of the existing dataJobInfo aspect information associated with this job.
chart_info_mcp = MetadataChangeProposalWrapper(
entityType="dataJob",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_data_job_urn(
orchestrator="airflow", flow_id="flow1", job_id="job1", cluster="prod"
),
aspectName="dataJobInfo",
aspect=datajob_info,
)

# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")

# Emit metadata!
emitter.emit_mcp(chart_info_mcp)