Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ec8a385
Create common AWS config
kevinhu Jun 23, 2021
d3bf612
Init sagemaker
kevinhu Jun 23, 2021
3db0a58
Common AWS dependencies
kevinhu Jun 23, 2021
d282559
Get features in feature group
kevinhu Jun 23, 2021
8f455c9
Ingest feature groups
kevinhu Jun 23, 2021
2bfb882
Add example ingestion config
kevinhu Jun 23, 2021
5bff9f4
Fix feature ingestion
kevinhu Jun 23, 2021
44ecb58
Append Glue data catalog source
kevinhu Jun 23, 2021
d660a9b
Handle primary key ingestion
kevinhu Jun 24, 2021
0259845
Init tests and stubs
kevinhu Jun 24, 2021
cd4d233
Add sagemaker golden
kevinhu Jun 24, 2021
4ff8434
Clean up golden
kevinhu Jun 24, 2021
8971109
Add descriptions and filter primary keys
kevinhu Jun 24, 2021
9133c85
Include custom fields in feature tables
kevinhu Jun 24, 2021
777f7df
Add sagemaker custom properties
kevinhu Jun 24, 2021
3722726
Merge
kevinhu Jun 24, 2021
149584a
Cleanup
kevinhu Jun 24, 2021
fb70c0b
Fix old references
kevinhu Jun 24, 2021
1c248c3
Add test stub with offline store
kevinhu Jun 24, 2021
3a4012e
Update custom properties
kevinhu Jun 24, 2021
3b575b1
Merge
kevinhu Jun 25, 2021
ffcd8cc
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-fe…
kevinhu Jun 25, 2021
768393e
Refactor
kevinhu Jun 25, 2021
4bc4601
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-fe…
kevinhu Jun 28, 2021
63841e4
Update comments
kevinhu Jun 28, 2021
30564cc
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-fe…
kevinhu Jun 29, 2021
0bbe932
Merge
kevinhu Jun 29, 2021
8f96239
Fix imports order
kevinhu Jun 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Ingest feature groups
  • Loading branch information
kevinhu committed Jun 23, 2021
commit 8f455c9184ecc86a4529796bdc76a6b51dbc9e96
76 changes: 75 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/sagemaker.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
from dataclasses import dataclass
from dataclasses import field as dataclass_field
from typing import Any, Dict, List
from typing import Any, Dict, Iterable, List

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.source.aws_common import AwsSourceConfig
from datahub.ingestion.source.metadata_common import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import MLFeatureDataType
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
MLFeatureSnapshot,
MLFeatureTableSnapshot,
MLPrimaryKeySnapshot,
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
MLFeaturePropertiesClass,
MLFeatureTablePropertiesClass,
MLPrimaryKeyPropertiesClass,
)


class SagemakerSourceConfig(AwsSourceConfig):
Expand Down Expand Up @@ -77,3 +92,62 @@ def get_feature_group_details(self, feature_group_name: str) -> Dict[str, Any]:
next_token = feature_group.get("NextToken")

return feature_group

def get_feature_group_wu(
self, feature_group_details: Dict[str, Any]
) -> MetadataWorkUnit:
"""
Generate an MLFeatureTable workunit for a SageMaker feature group.

Parameters
----------
feature_group_details:
ingested SageMaker feature group from get_feature_group_details()
"""

feature_group_name = feature_group_details["FeatureGroupName"]

feature_group_snapshot = MLFeatureTableSnapshot(
urn=builder.make_ml_feature_table_urn("sagemaker", feature_group_name),
aspects=[],
)

feature_group_snapshot.aspects.append(
MLFeatureTablePropertiesClass(
mlFeatures=[
builder.make_ml_feature_urn(
feature_group_name,
feature["FeatureName"],
)
for feature in feature_group_details["FeatureDefinitions"]
],
mlPrimaryKeys=[
builder.make_ml_primary_key_urn(
feature_group_name,
feature_group_details["RecordIdentifierFeatureName"],
)
],
)
)

# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=feature_group_snapshot)
return MetadataWorkUnit(id=feature_group_name, mce=mce)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:

feature_groups = self.get_all_feature_groups()

for feature_group in feature_groups:

feature_group_details = self.get_feature_group_details(
feature_group["FeatureGroupName"]
)

yield self.get_feature_group_wu(feature_group_details)

def get_report(self):
return self.report

def close(self):
pass