Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions metadata-ingestion/examples/recipes/example_to_datahub_kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
source:
type: "file"
config:
filename: "./examples/mce_files/bootstrap_mce.json"

sink:
type: "datahub-kafka"
config:
connection:
bootstrap: localhost:9092
schema_registry_url: http://localhost:8081
94 changes: 81 additions & 13 deletions metadata-ingestion/scripts/avro_codegen.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,58 @@
import json
import types
import unittest.mock
from pathlib import Path
from typing import Union
from typing import Dict, Iterable, List, Union

import avro.schema
import click
from avrogen import write_schema_files


def load_schema_file(schema_file: str) -> str:
with open(schema_file) as f:
raw_schema_text = f.read()

redo_spaces = json.dumps(json.loads(raw_schema_text), indent=2)
return redo_spaces


def merge_schemas(schemas: List[str]) -> str:
# Combine schemas.
schemas_obj = [json.loads(schema) for schema in schemas]
merged = ["null"] + schemas_obj

# Deduplicate repeated names.
def Register(self, schema):
if schema.fullname in self._names:
# print(f"deduping {schema.fullname}")
pass
else:
self._names[schema.fullname] = schema

with unittest.mock.patch("avro.schema.Names.Register", Register):
cleaned_schema = avro.schema.SchemaFromJSONData(merged)

# Convert back to an Avro schema JSON representation.
class MappingProxyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, types.MappingProxyType):
return dict(obj)
return json.JSONEncoder.default(self, obj)

out_schema = cleaned_schema.to_json()
encoded = json.dumps(out_schema, cls=MappingProxyEncoder, indent=2)
return encoded


autogen_header = """# flake8: noqa

# This file is autogenerated by /metadata-ingestion/scripts/avro_codegen.py
# Do not modify manually!

# fmt: off
"""
autogen_footer = "# fmt: on\n"


def suppress_checks_in_file(filepath: Union[str, Path]) -> None:
Expand All @@ -27,28 +68,55 @@ def suppress_checks_in_file(filepath: Union[str, Path]) -> None:
f.seek(0, 0)
f.write(autogen_header)
f.write(contents)
f.write("# fmt: on\n")
f.write(autogen_footer)


@click.command()
@click.argument("schema_file", type=click.Path(exists=True))
@click.argument("outdir", type=click.Path())
def generate(schema_file: str, outdir: str) -> None:
with open(schema_file) as f:
raw_schema_text = f.read()
load_schema_method = """
import functools
import pathlib

def _load_schema(schema_name: str) -> str:
return (pathlib.Path(__file__).parent / f"{schema_name}.avsc").read_text()
"""
individual_schema_method = """
@functools.lru_cache(maxsize=None)
def get{schema_name}Schema() -> str:
return _load_schema("{schema_name}")
"""

no_spaces_schema = json.dumps(json.loads(raw_schema_text))
schema_json = no_spaces_schema.replace(
'{"type": "string", "avro.java.string": "String"}', '"string"'

def make_load_schema_methods(schemas: Iterable[str]) -> str:
return load_schema_method + "".join(
individual_schema_method.format(schema_name=schema) for schema in schemas
)

redo_spaces = json.dumps(json.loads(schema_json), indent=2)

write_schema_files(redo_spaces, outdir)
@click.command()
@click.argument("schema_files", type=click.Path(exists=True), nargs=-1, required=True)
@click.argument("outdir", type=click.Path(), required=True)
def generate(schema_files: List[str], outdir: str) -> None:
schemas: Dict[str, str] = {}
for schema_file in schema_files:
schema = load_schema_file(schema_file)
schemas[Path(schema_file).stem] = schema

merged_schema = merge_schemas(list(schemas.values()))

write_schema_files(merged_schema, outdir)
with open(f"{outdir}/__init__.py", "w"):
# Truncate this file.
pass

# Save raw schema files in codegen as well.
schema_save_dir = Path(outdir) / "schemas"
schema_save_dir.mkdir()
for schema_out_file, schema in schemas.items():
(schema_save_dir / f"{schema_out_file}.avsc").write_text(schema)

# Add load_schema method.
with open(schema_save_dir / "__init__.py", "a") as schema_dir_init:
schema_dir_init.write(make_load_schema_methods(schemas.keys()))

# Add headers for all generated files
generated_files = Path(outdir).glob("**/*.py")
for file in generated_files:
Expand Down
6 changes: 3 additions & 3 deletions metadata-ingestion/scripts/codegen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ OUTDIR=./src/datahub/metadata

# Note: this assumes that datahub has already been built with `./gradlew build`.
DATAHUB_ROOT=..
cp $DATAHUB_ROOT/metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc .
SCHEMAS_ROOT="$DATAHUB_ROOT/metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe"
FILES="$SCHEMAS_ROOT/MetadataChangeEvent.avsc $SCHEMAS_ROOT/MetadataAuditEvent.avsc"

rm -r $OUTDIR || true
python scripts/avro_codegen.py MetadataChangeEvent.avsc $OUTDIR
rm MetadataChangeEvent.avsc
python scripts/avro_codegen.py $FILES $OUTDIR
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def get_long_description():
package_data={
"datahub": ["py.typed"],
"datahub.metadata": ["schema.avsc"],
"datahub.metadata.schemas": ["*.avsc"],
},
entry_points=entry_points,
# Dependencies.
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/emitter/kafka_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from datahub.configuration.common import ConfigModel
from datahub.configuration.kafka import KafkaProducerConnectionConfig
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import SCHEMA_JSON_STR
from datahub.metadata.schemas import getMetadataChangeEventSchema

DEFAULT_KAFKA_TOPIC = "MetadataChangeEvent_v4"

Expand Down Expand Up @@ -38,7 +38,7 @@ def convert_mce_to_dict(
return tuple_encoding

avro_serializer = AvroSerializer(
schema_str=SCHEMA_JSON_STR,
schema_str=getMetadataChangeEventSchema(),
schema_registry_client=schema_registry_client,
to_dict=convert_mce_to_dict,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
# Do not modify manually!

# fmt: off
from .....schema_classes import MetadataAuditEventClass
from .....schema_classes import MetadataChangeEventClass


MetadataAuditEvent = MetadataAuditEventClass
MetadataChangeEvent = MetadataChangeEventClass
# fmt: on
Loading