Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixes for union schema
  • Loading branch information
hsheth2 committed Jun 17, 2021
commit 3004b50b438340d76b9b6a33d6327ff6f560610a
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ sink:
config:
connection:
bootstrap: localhost:9092
schema_registry_url: http://localhost:8081
47 changes: 33 additions & 14 deletions metadata-ingestion/scripts/avro_codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,23 @@
import click
from avrogen import write_schema_files

load_schema_method = """
import pathlib

def load_schema(schema_name: str) -> str:
return (pathlib.Path(__file__).parent / f"{schema_name}.avsc").read_text()

"""

autogen_header = """# flake8: noqa

# This file is autogenerated by /metadata-ingestion/scripts/avro_codegen.py
# Do not modify manually!

# fmt: off
"""
autogen_footer = "# fmt: on\n"


def load_schema_file(schema_file: str) -> str:
with open(schema_file) as f:
Expand All @@ -31,7 +48,8 @@ def merge_schemas(schemas: List[str]) -> str:
# Deduplicate repeated names.
def Register(self, schema):
if schema.fullname in self._names:
print(f"deduping {schema.fullname}")
# print(f"deduping {schema.fullname}")
pass
else:
self._names[schema.fullname] = schema

Expand All @@ -50,15 +68,6 @@ def default(self, obj):
return encoded


autogen_header = """# flake8: noqa

# This file is autogenerated by /metadata-ingestion/scripts/avro_codegen.py
# Do not modify manually!

# fmt: off
"""


def suppress_checks_in_file(filepath: Union[str, Path]) -> None:
"""
Adds a couple lines to the top of an autogenerated file:
Expand All @@ -72,25 +81,35 @@ def suppress_checks_in_file(filepath: Union[str, Path]) -> None:
f.seek(0, 0)
f.write(autogen_header)
f.write(contents)
f.write("# fmt: on\n")
f.write(autogen_footer)


@click.command()
@click.argument("schema_files", type=click.Path(exists=True), nargs=-1, required=True)
@click.argument("outdir", type=click.Path(), required=True)
def generate(schema_files: List[str], outdir: str) -> None:
schemas = []
schemas = {}
for schema_file in schema_files:
schema = load_schema_file(schema_file)
schemas.append(schema)
schemas[Path(schema_file).stem] = schema

merged_schema = merge_schemas(schemas)
merged_schema = merge_schemas(list(schemas.values()))

write_schema_files(merged_schema, outdir)
with open(f"{outdir}/__init__.py", "w"):
# Truncate this file.
pass

# Save raw schema files in codegen as well.
schema_save_dir = Path(outdir) / "schemas"
schema_save_dir.mkdir()
for schema_out_file, schema in schemas.items():
(schema_save_dir / f"{schema_out_file}.avsc").write_text(schema)

# Add load_schema method.
with open(schema_save_dir / "__init__.py", "a") as schema_dir_init:
schema_dir_init.write(load_schema_method)

# Add headers for all generated files
generated_files = Path(outdir).glob("**/*.py")
for file in generated_files:
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def get_long_description():
package_data={
"datahub": ["py.typed"],
"datahub.metadata": ["schema.avsc"],
"datahub.metadata.schemas": ["*.avsc"],
},
entry_points=entry_points,
# Dependencies.
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/emitter/kafka_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from datahub.configuration.common import ConfigModel
from datahub.configuration.kafka import KafkaProducerConnectionConfig
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import SCHEMA_JSON_STR
from datahub.metadata.schemas import load_schema

DEFAULT_KAFKA_TOPIC = "MetadataChangeEvent_v4"

Expand Down Expand Up @@ -38,7 +38,7 @@ def convert_mce_to_dict(
return tuple_encoding

avro_serializer = AvroSerializer(
schema_str=SCHEMA_JSON_STR,
schema_str=load_schema("MetadataChangeEvent"),
schema_registry_client=schema_registry_client,
to_dict=convert_mce_to_dict,
)
Expand Down
Loading