Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Documentation updates to using datahub as a library with more emitter…
… and lineage examples.
  • Loading branch information
rslanka committed Nov 18, 2021
commit cdfa9f77b6ed88bcf8d5d9793bbbe3a850844129
16 changes: 14 additions & 2 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,20 @@ Check out the [transformers guide](./transformers.md) for more info!

In some cases, you might want to construct the MetadataChangeEvents yourself but still use this framework to emit that metadata to DataHub. In this case, take a look at the emitter interfaces, which can easily be imported and called from your own code.

- [DataHub emitter via REST](./src/datahub/emitter/rest_emitter.py) (same requirements as `datahub-rest`). Basic usage [example](./examples/library/lineage_emitter_rest.py).
- [DataHub emitter via Kafka](./src/datahub/emitter/kafka_emitter.py) (same requirements as `datahub-kafka`). Basic usage [example](./examples/library/lineage_emitter_kafka.py).
- [DataHub emitter via REST](./src/datahub/emitter/rest_emitter.py) (same requirements as `datahub-rest`).
- [DataHub emitter via Kafka](./src/datahub/emitter/kafka_emitter.py) (same requirements as `datahub-kafka`).
### Sample code
#### Lineage
- [Emitting simple lineage via REST as MetadataChangeEvent](./examples/library/lineage_emitter_rest.py).
- [Emitting simple lineage via Kafka as MetadataChangeEvent](./examples/library/lineage_emitter_kafka.py).
- [Emitting simple lineage via REST as MetadataChangeProposalWrapper](./examples/library/lineage_emitter_mcpw_rest.py)
- [Datahub's Snowflake lineage as MetadataChangeProposalWrapper](https://github.com/linkedin/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py#L249)
- [Datahub's DBT lineage as MetadataChangeEvent](https://github.com/linkedin/datahub/blob/a9754ebe83b6b73bc2bfbf49d9ebf5dbd2ca5a8f/metadata-ingestion/src/datahub/ingestion/source/dbt.py#L625,L630)
- [Datahub's Bigquery lineage as MetadataChangeProposalWrapper](https://github.com/linkedin/datahub/blob/a1bf95307b040074c8d65ebb86b5eb177fdcd591/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py#L229)
#### Programmatic Pipeline
In some cases, you might want to configure and run a pipeline entirely from within your custom python script. Here is an example of how to do it.
- [programmatic_pipeline.py](./examples/library/programatic_pipeline.py) - a basic mysql to REST programmatic pipeline.


## Lineage with Airflow

Expand Down
42 changes: 42 additions & 0 deletions metadata-ingestion/examples/library/lineage_emitter_mcpw_rest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import List

import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageTypeClass,
UpstreamClass,
UpstreamLineage,
)
from datahub.metadata.schema_classes import ChangeTypeClass

# Construct upstream tables.
upstream_tables: List[UpstreamClass] = []
upstream_table_1 = UpstreamClass(
dataset=builder.make_dataset_urn("bigquery", "upstream_table_1", "PROD"),
type=DatasetLineageTypeClass.TRANSFORMED,
)
upstream_tables.append(upstream_table_1)
upstream_table_2 = UpstreamClass(
dataset=builder.make_dataset_urn("bigquery", "upstream_table_2", "PROD"),
type=DatasetLineageTypeClass.TRANSFORMED,
)
upstream_tables.append(upstream_table_2)

# Construct a lineage object.
upstream_lineage = UpstreamLineage(upstreams=upstream_tables)

# Construct a MetadataChangeProposalWrapper object.
lineage_mcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_dataset_urn("bigquery", "downstream"),
aspectName="upstreamLineage",
aspect=upstream_lineage,
)

# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")

# Emit metadata!
emitter.emit_mcp(lineage_mcp)