Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ec8a385
Create common AWS config
kevinhu Jun 23, 2021
d3bf612
Init sagemaker
kevinhu Jun 23, 2021
3db0a58
Common AWS dependencies
kevinhu Jun 23, 2021
d282559
Get features in feature group
kevinhu Jun 23, 2021
8f455c9
Ingest feature groups
kevinhu Jun 23, 2021
2bfb882
Add example ingestion config
kevinhu Jun 23, 2021
5bff9f4
Fix feature ingestion
kevinhu Jun 23, 2021
44ecb58
Append Glue data catalog source
kevinhu Jun 23, 2021
d660a9b
Handle primary key ingestion
kevinhu Jun 24, 2021
0259845
Init tests and stubs
kevinhu Jun 24, 2021
cd4d233
Add sagemaker golden
kevinhu Jun 24, 2021
4ff8434
Clean up golden
kevinhu Jun 24, 2021
8971109
Add descriptions and filter primary keys
kevinhu Jun 24, 2021
9133c85
Include custom fields in feature tables
kevinhu Jun 24, 2021
777f7df
Add sagemaker custom properties
kevinhu Jun 24, 2021
3722726
Merge
kevinhu Jun 24, 2021
149584a
Cleanup
kevinhu Jun 24, 2021
fb70c0b
Fix old references
kevinhu Jun 24, 2021
1c248c3
Add test stub with offline store
kevinhu Jun 24, 2021
3a4012e
Update custom properties
kevinhu Jun 24, 2021
3b575b1
Merge
kevinhu Jun 25, 2021
ffcd8cc
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-fe…
kevinhu Jun 25, 2021
768393e
Refactor
kevinhu Jun 25, 2021
4bc4601
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-fe…
kevinhu Jun 28, 2021
63841e4
Update comments
kevinhu Jun 28, 2021
30564cc
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-fe…
kevinhu Jun 29, 2021
0bbe932
Merge
kevinhu Jun 29, 2021
8f96239
Fix imports order
kevinhu Jun 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add example ingestion config
  • Loading branch information
kevinhu committed Jun 23, 2021
commit 2bfb88235c3029391082e65d435f14a28da36e97
2 changes: 1 addition & 1 deletion metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ source:

Extracts:

- Feature groups (support for models and jobs coming soon!)
- Feature groups (support for models, jobs, and more coming soon!)

```yml
source:
Expand Down
9 changes: 9 additions & 0 deletions metadata-ingestion/examples/recipes/sagemaker_to_datahub.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
source:
type: glue
config:
aws_region: "us-west-2"

sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"
87 changes: 86 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/sagemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def report_table_dropped(self, table: str) -> None:
self.filtered.append(table)


class GlueSource(Source):
class SagemakerSource(Source):
source_config: SagemakerSourceConfig
report = SagemakerSourceReport()

Expand Down Expand Up @@ -134,6 +134,89 @@ def get_feature_group_wu(
mce = MetadataChangeEvent(proposedSnapshot=feature_group_snapshot)
return MetadataWorkUnit(id=feature_group_name, mce=mce)

def get_feature_type(self, aws_type: str, feature_name: str) -> str:

mapped_type = {
"String": MLFeatureDataType.TEXT,
"Integral": MLFeatureDataType.ORDINAL,
"Fractional": MLFeatureDataType.CONTINUOUS,
}.get(aws_type)

if mapped_type is None:
self.report.report_warning(
feature_name, f"unable to map type {aws_type} to metadata schema"
)
mapped_type = MLFeatureDataType.UNKNOWN

return mapped_type

def get_feature_wu(
self, feature_group_details: Dict[str, Any], feature: Dict[str, Any]
) -> MetadataWorkUnit:
"""
Generate an MLFeature workunit for a SageMaker feature.

Parameters
----------
feature_group_details:
ingested SageMaker feature group from get_feature_group_details()
feature:
ingested SageMaker feature
"""

# create snapshot instance for the feature
feature_snapshot = MLFeatureSnapshot(
urn=builder.make_ml_feature_urn(
feature_group_details["FeatureGroupName"],
feature["FeatureName"],
),
aspects=[],
)

feature_sources = []

if "OfflineStoreConfig" in feature_group_details:

# remove S3 prefix (s3://)
s3_name = feature_group_details["OfflineStoreConfig"]["S3StorageConfig"]["S3Uri"][5:]

if s3_name.endswith("/"):
s3_name = s3_name[:-1]

feature_sources.append(
builder.make_dataset_urn(
"s3",
s3_name,
self.source_config.env,
)
)

glue_database = feature_group_details["OfflineStoreConfig"]["DataCatalogConfig"]["Database"]
glue_table = feature_group_details["OfflineStoreConfig"]["DataCatalogConfig"]["TableName"]

full_table_name = f"{node_args['database']}.{node_args['table_name']}"

# we know that the table will already be covered when ingesting Glue tables
node_urn = f"urn:li:dataset:(urn:li:dataPlatform:glue,{full_table_name},{self.env})"

# note that there's also an OnlineStoreConfig field, but this
# lack enough metadata to create a dataset
# (only specifies the security config and whether it's enabled at all)

# append feature name and type
feature_snapshot.aspects.append(
MLFeaturePropertiesClass(
dataType=self.get_feature_type(
feature["FeatureType"], feature["FeatureName"]
),
sources=feature_sources,
)
)

# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=feature_snapshot)
return MetadataWorkUnit(id=feature["FeatureName"], mce=mce)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:

feature_groups = self.get_all_feature_groups()
Expand All @@ -144,6 +227,8 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
feature_group["FeatureGroupName"]
)

for feature in feature_groups

yield self.get_feature_group_wu(feature_group_details)

def get_report(self):
Expand Down