Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
5fbcd31
Add get_model utils
kevinhu Jun 30, 2021
0c5aee0
Add job listing commands
kevinhu Jun 30, 2021
459a5fc
Init sagemaker processors
kevinhu Jun 30, 2021
98ca20b
Add process_training_job
kevinhu Jun 30, 2021
df90714
Add tuning jobs
kevinhu Jun 30, 2021
6eea1b3
Create SageMakerJob intermediate
kevinhu Jun 30, 2021
bf49145
Reorganize URN generators
kevinhu Jun 30, 2021
8763515
Construct arn-name translator
kevinhu Jul 1, 2021
233c00f
Refactor SageMaker job processors into class
kevinhu Jul 1, 2021
a089b2d
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-mo…
kevinhu Jul 1, 2021
4cb0e5a
Switch to tuple-indexed ARNs
kevinhu Jul 1, 2021
02d3437
Add input/outputs for process_transform_job
kevinhu Jul 1, 2021
6807382
Comment out unsupported aspects
kevinhu Jul 1, 2021
c282a6b
process_labeling_job datasets
kevinhu Jul 1, 2021
b05e66b
process_training_job datasets
kevinhu Jul 1, 2021
a51b79f
Init status enums
kevinhu Jul 1, 2021
386f46d
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-mo…
kevinhu Jul 1, 2021
d003991
Update models
kevinhu Jul 1, 2021
925b72f
Add unknown status enum
kevinhu Jul 1, 2021
854bf4b
Revise source report
kevinhu Jul 1, 2021
82d4425
Init job stubs
kevinhu Jul 2, 2021
b99025f
Init model stubs
kevinhu Jul 2, 2021
abaab8b
Add list-models stub
kevinhu Jul 2, 2021
5abd576
Set names and ARNs for job stubs
kevinhu Jul 2, 2021
77e9b9e
Add list-jobs stubs
kevinhu Jul 2, 2021
67442aa
Refactor stubbed job names and arns
kevinhu Jul 2, 2021
ad02711
Refactor job types
kevinhu Jul 2, 2021
8cd750e
Refactor job MCE constructor
kevinhu Jul 2, 2021
2cd0b85
Add feast and sagemaker dataplatforms
kevinhu Jul 2, 2021
77209d8
Furnish S3 paths in stubs
kevinhu Jul 2, 2021
bf63192
Add stubber responses
kevinhu Jul 2, 2021
856b251
Refactor and fix stub validation errors
kevinhu Jul 2, 2021
f570003
Setup model stubs
kevinhu Jul 2, 2021
a59ab89
Refactor feature group yielding
kevinhu Jul 2, 2021
2432357
Set up model ingestion
kevinhu Jul 2, 2021
1502e01
Parse in model creation times
kevinhu Jul 2, 2021
996c818
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-mo…
kevinhu Jul 2, 2021
bfb46eb
Regenerate snapshots
kevinhu Jul 2, 2021
5449c7e
Move custom properties
kevinhu Jul 2, 2021
44b18fc
Generate model custom properties
kevinhu Jul 2, 2021
0fc259d
Fix job stubbing order
kevinhu Jul 2, 2021
f72d776
Working jobs ingestion
kevinhu Jul 2, 2021
7f30e86
Add custom properties
kevinhu Jul 2, 2021
3559015
Switch to sets for i/o jobs
kevinhu Jul 2, 2021
d204b76
Ingest input jobs
kevinhu Jul 2, 2021
ef8568b
Add job filtering options
kevinhu Jul 3, 2021
0ed1d73
Fix jobs filter and sort datasets
kevinhu Jul 3, 2021
e4797d0
Ingest datasets
kevinhu Jul 3, 2021
884631c
Ingest custom dataset properties
kevinhu Jul 3, 2021
d193e67
Typo fixes
kevinhu Jul 3, 2021
75f174a
Refactor reports
kevinhu Jul 6, 2021
8def468
Refactor out s3 URN constructor
kevinhu Jul 6, 2021
cba59c2
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-mo…
kevinhu Jul 6, 2021
27e921a
Remove unused
kevinhu Jul 6, 2021
ddb696a
Add env to models
kevinhu Jul 6, 2021
23c7541
Add umbrella flow and fix job envs
kevinhu Jul 6, 2021
bc80bff
Fix edge packaging stub
kevinhu Jul 6, 2021
24ab430
Set model sort order
kevinhu Jul 6, 2021
571e2b6
Comments for jobs
kevinhu Jul 6, 2021
e10592d
Fix time zones in stubs
kevinhu Jul 6, 2021
3584cfc
Create dataflow for each job
kevinhu Jul 6, 2021
4c44910
Set flows and migrate from azkaban enum
kevinhu Jul 7, 2021
8018022
Update rest sink test
kevinhu Jul 7, 2021
6675dd5
Set browse paths
kevinhu Jul 7, 2021
8e8eecd
Revert file to rest recipe
kevinhu Jul 7, 2021
d3b2e17
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-mo…
kevinhu Jul 7, 2021
5331ba0
Browse paths for feature tables
kevinhu Jul 8, 2021
83fb6fd
Refactor make_s3_urn to aws_common
kevinhu Jul 8, 2021
76c7c7e
Add comment for deprecated azkaban types
kevinhu Jul 8, 2021
df56a61
Resolve merge conflict
kevinhu Jul 8, 2021
f98553b
Update schema_classes
kevinhu Jul 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor and fix stub validation errors
  • Loading branch information
kevinhu committed Jul 2, 2021
commit 856b2517177cf8226e1dc7b05df5857696b8c32d
21 changes: 6 additions & 15 deletions metadata-ingestion/src/datahub/ingestion/source/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,14 @@ def s3_client(self):

@dataclass
class GlueSourceReport(SourceReport):
feature_groups_scanned = 0
features_scanned = 0
models_scanned = 0
jobs_scanned = 0
tables_scanned = 0
filtered: List[str] = dataclass_field(default_factory=list)

# TODO: report these
def report_feature_group_scanned(self) -> None:
self.feature_groups_scanned += 1
def report_table_scanned(self) -> None:
self.tables_scanned += 1

def report_feature_scanned(self) -> None:
self.features_scanned += 1

def report_model_scanned(self) -> None:
self.models_scanned += 1

def report_job_scanned(self) -> None:
self.jobs_scanned += 1
def report_table_dropped(self, table: str) -> None:
self.filtered.append(table)


class GlueSource(Source):
Expand Down
322 changes: 31 additions & 291 deletions metadata-ingestion/src/datahub/ingestion/source/sagemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws_common import AwsSourceConfig
from datahub.ingestion.source.sagemaker_processors.feature_groups import (
FeatureGroupProcessor,
)
from datahub.ingestion.source.sagemaker_processors.jobs import SAGEMAKER_JOB_TYPES
from datahub.metadata.com.linkedin.pegasus2avro.common import MLFeatureDataType
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
Expand All @@ -30,14 +33,27 @@ def sagemaker_client(self):

@dataclass
class SagemakerSourceReport(SourceReport):
tables_scanned = 0
filtered: List[str] = dataclass_field(default_factory=list)
feature_groups_scanned = 0
features_scanned = 0
models_scanned = 0
jobs_scanned = 0
datasets_scanned = 0

# TODO: report these
def report_feature_group_scanned(self) -> None:
self.feature_groups_scanned += 1

def report_feature_scanned(self) -> None:
self.features_scanned += 1

def report_table_scanned(self) -> None:
self.tables_scanned += 1
def report_model_scanned(self) -> None:
self.models_scanned += 1

def report_table_dropped(self, table: str) -> None:
self.filtered.append(table)
def report_job_scanned(self) -> None:
self.jobs_scanned += 1

def report_dataset_scanned(self) -> None:
self.datasets_scanned += 1


class SagemakerSource(Source):
Expand All @@ -56,304 +72,28 @@ def create(cls, config_dict, ctx):
config = SagemakerSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_all_feature_groups(self) -> List[Dict[str, Any]]:
"""
List all feature groups in SageMaker.
"""

feature_groups = []

# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_feature_groups
paginator = self.sagemaker_client.get_paginator("list_feature_groups")
for page in paginator.paginate():
feature_groups += page["FeatureGroupSummaries"]

return feature_groups

def get_all_models(self) -> List[Dict[str, Any]]:
"""
List all models in SageMaker.
"""

models = []

# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_models
paginator = self.sagemaker_client.get_paginator("list_models")
for page in paginator.paginate():
models += page["Models"]

return models

def get_all_jobs(
self,
) -> Tuple[List[Dict[str, Any]], Dict[str, str], Dict[str, str]]:
"""
List all jobs in SageMaker.
"""

jobs = []

# dictionaries for translating between type-specific job names and ARNs
arn_to_name: Dict[str, Tuple[str, str]] = {}
name_to_arn: Dict[Tuple[str, str], str] = {}

for job_type, job_spec in SAGEMAKER_JOB_TYPES.items():

paginator = self.sagemaker_client.get_paginator(job_spec["list_command"])
for page in paginator.paginate():
page_jobs = page[job_spec["list_key"]]

for job in page_jobs:
job_name = (job_type, job_spec)
job_arn = job[job_spec["list_name_arn"]]

arn_to_name[job_arn] = job_name
name_to_arn[job_name] = job_arn

page_jobs = [{**job, "type": job_type} for job in page_jobs]

jobs += page_jobs

return jobs, arn_to_name, name_to_arn

def get_job_details(
self, job_name: str, describe_command: str, describe_name_key: str
) -> Dict[str, Any]:

return getattr(self.sagemaker_client, describe_command)(
**{describe_name_key: job_name}
)

def get_feature_group_details(self, feature_group_name: str) -> Dict[str, Any]:
"""
Get details of a feature group (including list of component features).
"""

# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.describe_feature_group
feature_group = self.sagemaker_client.describe_feature_group(
FeatureGroupName=feature_group_name
)

# use falsy fallback since AWS stubs require this to be a string in tests
next_token = feature_group.get("NextToken", "")

# paginate over feature group features
while next_token:
next_features = self.sagemaker_client.describe_feature_group(
FeatureGroupName=feature_group_name, NextToken=next_token
)
feature_group["FeatureDefinitions"].append(
next_features["FeatureDefinitions"]
)
next_token = feature_group.get("NextToken", "")

return feature_group

def get_model_details(self, model_name: str) -> Dict[str, Any]:
"""
Get details of a model.
"""

# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.describe_model
return self.sagemaker_client.describe_model(ModelName=model_name)

def get_feature_group_wu(
self, feature_group_details: Dict[str, Any]
) -> MetadataWorkUnit:
"""
Generate an MLFeatureTable workunit for a SageMaker feature group.

Parameters
----------
feature_group_details:
ingested SageMaker feature group from get_feature_group_details()
"""

feature_group_name = feature_group_details["FeatureGroupName"]

feature_group_snapshot = MLFeatureTableSnapshot(
urn=builder.make_ml_feature_table_urn("sagemaker", feature_group_name),
aspects=[],
)

feature_group_snapshot.aspects.append(
MLFeatureTablePropertiesClass(
description=feature_group_details.get("Description"),
# non-primary key features
mlFeatures=[
builder.make_ml_feature_urn(
feature_group_name,
feature["FeatureName"],
)
for feature in feature_group_details["FeatureDefinitions"]
if feature["FeatureName"]
!= feature_group_details["RecordIdentifierFeatureName"]
],
mlPrimaryKeys=[
builder.make_ml_primary_key_urn(
feature_group_name,
feature_group_details["RecordIdentifierFeatureName"],
)
],
# additional metadata
customProperties={
"arn": feature_group_details["FeatureGroupArn"],
"creation_time": str(feature_group_details["CreationTime"]),
"status": feature_group_details["FeatureGroupStatus"],
},
)
)

# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=feature_group_snapshot)
return MetadataWorkUnit(id=feature_group_name, mce=mce)

field_type_mappings = {
"String": MLFeatureDataType.TEXT,
"Integral": MLFeatureDataType.ORDINAL,
"Fractional": MLFeatureDataType.CONTINUOUS,
}

def get_feature_type(self, aws_type: str, feature_name: str) -> str:

mapped_type = self.field_type_mappings.get(aws_type)

if mapped_type is None:
self.report.report_warning(
feature_name, f"unable to map type {aws_type} to metadata schema"
)
mapped_type = MLFeatureDataType.UNKNOWN

return mapped_type

def get_feature_wu(
self, feature_group_details: Dict[str, Any], feature: Dict[str, Any]
) -> MetadataWorkUnit:
"""
Generate an MLFeature workunit for a SageMaker feature.

Parameters
----------
feature_group_details:
ingested SageMaker feature group from get_feature_group_details()
feature:
ingested SageMaker feature
"""

# if the feature acts as the record identifier, then we ingest it as an MLPrimaryKey
# the RecordIdentifierFeatureName is guaranteed to exist as it's required on creation
is_record_identifier = (
feature_group_details["RecordIdentifierFeatureName"]
== feature["FeatureName"]
)

feature_sources = []

if "OfflineStoreConfig" in feature_group_details:

# remove S3 prefix (s3://)
s3_name = feature_group_details["OfflineStoreConfig"]["S3StorageConfig"][
"S3Uri"
][5:]

if s3_name.endswith("/"):
s3_name = s3_name[:-1]

feature_sources.append(
builder.make_dataset_urn(
"s3",
s3_name,
self.source_config.env,
)
)

if "DataCatalogConfig" in feature_group_details["OfflineStoreConfig"]:

# if Glue catalog associated with offline store
glue_database = feature_group_details["OfflineStoreConfig"][
"DataCatalogConfig"
]["Database"]
glue_table = feature_group_details["OfflineStoreConfig"][
"DataCatalogConfig"
]["TableName"]

full_table_name = f"{glue_database}.{glue_table}"

self.report.report_warning(
full_table_name,
f"""Note: table {full_table_name} is an AWS Glue object.
To view full table metadata, run Glue ingestion
(see https://datahubproject.io/docs/metadata-ingestion/#aws-glue-glue)""",
)

feature_sources.append(
f"urn:li:dataset:(urn:li:dataPlatform:glue,{full_table_name},{self.source_config.env})"
)

# note that there's also an OnlineStoreConfig field, but this
# lacks enough metadata to create a dataset
# (only specifies the security config and whether it's enabled at all)

# append feature name and type
if is_record_identifier:
primary_key_snapshot: MLPrimaryKeySnapshot = MLPrimaryKeySnapshot(
urn=builder.make_ml_primary_key_urn(
feature_group_details["FeatureGroupName"],
feature["FeatureName"],
),
aspects=[
MLPrimaryKeyPropertiesClass(
dataType=self.get_feature_type(
feature["FeatureType"], feature["FeatureName"]
),
sources=feature_sources,
),
],
)

# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=primary_key_snapshot)
else:
# create snapshot instance for the feature
feature_snapshot: MLFeatureSnapshot = MLFeatureSnapshot(
urn=builder.make_ml_feature_urn(
feature_group_details["FeatureGroupName"],
feature["FeatureName"],
),
aspects=[
MLFeaturePropertiesClass(
dataType=self.get_feature_type(
feature["FeatureType"], feature["FeatureName"]
),
sources=feature_sources,
)
],
)

# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=feature_snapshot)
def get_workunits(self) -> Iterable[MetadataWorkUnit]:

return MetadataWorkUnit(
id=f'{feature_group_details["FeatureGroupName"]}-{feature["FeatureName"]}',
mce=mce,
feature_group_processor = FeatureGroupProcessor(
sagemaker_client=self.sagemaker_client, env=self.env, report=self.report
)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:

feature_groups = self.get_all_feature_groups()
feature_groups = feature_group_processor.get_all_feature_groups()

for feature_group in feature_groups:

feature_group_details = self.get_feature_group_details(
feature_group_details = feature_group_processor.get_feature_group_details(
feature_group["FeatureGroupName"]
)

for feature in feature_group_details["FeatureDefinitions"]:
wu = self.get_feature_wu(feature_group_details, feature)
wu = feature_group_processor.get_feature_wu(
feature_group_details, feature
)
self.report.report_workunit(wu)
yield wu

wu = self.get_feature_group_wu(feature_group_details)
wu = feature_group_processor.get_feature_group_wu(feature_group_details)
self.report.report_workunit(wu)
yield wu

Expand Down
Loading