Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
5fbcd31
Add get_model utils
kevinhu Jun 30, 2021
0c5aee0
Add job listing commands
kevinhu Jun 30, 2021
459a5fc
Init sagemaker processors
kevinhu Jun 30, 2021
98ca20b
Add process_training_job
kevinhu Jun 30, 2021
df90714
Add tuning jobs
kevinhu Jun 30, 2021
6eea1b3
Create SageMakerJob intermediate
kevinhu Jun 30, 2021
bf49145
Reorganize URN generators
kevinhu Jun 30, 2021
8763515
Construct arn-name translator
kevinhu Jul 1, 2021
233c00f
Refactor SageMaker job processors into class
kevinhu Jul 1, 2021
a089b2d
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-mo…
kevinhu Jul 1, 2021
4cb0e5a
Switch to tuple-indexed ARNs
kevinhu Jul 1, 2021
02d3437
Add input/outputs for process_transform_job
kevinhu Jul 1, 2021
6807382
Comment out unsupported aspects
kevinhu Jul 1, 2021
c282a6b
process_labeling_job datasets
kevinhu Jul 1, 2021
b05e66b
process_training_job datasets
kevinhu Jul 1, 2021
a51b79f
Init status enums
kevinhu Jul 1, 2021
386f46d
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-mo…
kevinhu Jul 1, 2021
d003991
Update models
kevinhu Jul 1, 2021
925b72f
Add unknown status enum
kevinhu Jul 1, 2021
854bf4b
Revise source report
kevinhu Jul 1, 2021
82d4425
Init job stubs
kevinhu Jul 2, 2021
b99025f
Init model stubs
kevinhu Jul 2, 2021
abaab8b
Add list-models stub
kevinhu Jul 2, 2021
5abd576
Set names and ARNs for job stubs
kevinhu Jul 2, 2021
77e9b9e
Add list-jobs stubs
kevinhu Jul 2, 2021
67442aa
Refactor stubbed job names and arns
kevinhu Jul 2, 2021
ad02711
Refactor job types
kevinhu Jul 2, 2021
8cd750e
Refactor job MCE constructor
kevinhu Jul 2, 2021
2cd0b85
Add feast and sagemaker dataplatforms
kevinhu Jul 2, 2021
77209d8
Furnish S3 paths in stubs
kevinhu Jul 2, 2021
bf63192
Add stubber responses
kevinhu Jul 2, 2021
856b251
Refactor and fix stub validation errors
kevinhu Jul 2, 2021
f570003
Setup model stubs
kevinhu Jul 2, 2021
a59ab89
Refactor feature group yielding
kevinhu Jul 2, 2021
2432357
Set up model ingestion
kevinhu Jul 2, 2021
1502e01
Parse in model creation times
kevinhu Jul 2, 2021
996c818
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-mo…
kevinhu Jul 2, 2021
bfb46eb
Regenerate snapshots
kevinhu Jul 2, 2021
5449c7e
Move custom properties
kevinhu Jul 2, 2021
44b18fc
Generate model custom properties
kevinhu Jul 2, 2021
0fc259d
Fix job stubbing order
kevinhu Jul 2, 2021
f72d776
Working jobs ingestion
kevinhu Jul 2, 2021
7f30e86
Add custom properties
kevinhu Jul 2, 2021
3559015
Switch to sets for i/o jobs
kevinhu Jul 2, 2021
d204b76
Ingest input jobs
kevinhu Jul 2, 2021
ef8568b
Add job filtering options
kevinhu Jul 3, 2021
0ed1d73
Fix jobs filter and sort datasets
kevinhu Jul 3, 2021
e4797d0
Ingest datasets
kevinhu Jul 3, 2021
884631c
Ingest custom dataset properties
kevinhu Jul 3, 2021
d193e67
Typo fixes
kevinhu Jul 3, 2021
75f174a
Refactor reports
kevinhu Jul 6, 2021
8def468
Refactor out s3 URN constructor
kevinhu Jul 6, 2021
cba59c2
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-mo…
kevinhu Jul 6, 2021
27e921a
Remove unused
kevinhu Jul 6, 2021
ddb696a
Add env to models
kevinhu Jul 6, 2021
23c7541
Add umbrella flow and fix job envs
kevinhu Jul 6, 2021
bc80bff
Fix edge packaging stub
kevinhu Jul 6, 2021
24ab430
Set model sort order
kevinhu Jul 6, 2021
571e2b6
Comments for jobs
kevinhu Jul 6, 2021
e10592d
Fix time zones in stubs
kevinhu Jul 6, 2021
3584cfc
Create dataflow for each job
kevinhu Jul 6, 2021
4c44910
Set flows and migrate from azkaban enum
kevinhu Jul 7, 2021
8018022
Update rest sink test
kevinhu Jul 7, 2021
6675dd5
Set browse paths
kevinhu Jul 7, 2021
8e8eecd
Revert file to rest recipe
kevinhu Jul 7, 2021
d3b2e17
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-mo…
kevinhu Jul 7, 2021
5331ba0
Browse paths for feature tables
kevinhu Jul 8, 2021
83fb6fd
Refactor make_s3_urn to aws_common
kevinhu Jul 8, 2021
76c7c7e
Add comment for deprecated azkaban types
kevinhu Jul 8, 2021
df56a61
Resolve merge conflict
kevinhu Jul 8, 2021
f98553b
Update schema_classes
kevinhu Jul 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix job stubbing order
  • Loading branch information
kevinhu committed Jul 2, 2021
commit 0fc259d5f184d9d5583cb32ef341b3e7c63f1127
8 changes: 4 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/sagemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
)
yield from model_processor.get_workunits()

# job_processor = JobProcessor(
# sagemaker_client=self.sagemaker_client, env=self.env, report=self.report
# )
# yield from job_processor.get_workunits()
job_processor = JobProcessor(
sagemaker_client=self.sagemaker_client, env=self.env, report=self.report
)
yield from job_processor.get_workunits()

def get_report(self):
return self.reporte
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, Iterable, List, Optional, Tuple

from datahub.emitter import mce_builder
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
DataJobInfoClass,
DataJobSnapshotClass,
Expand All @@ -22,6 +23,7 @@ class SageMakerJobType:
describe_arn_key: str
describe_status_key: str
status_map: Dict[str, str]
processor: str


SAGEMAKER_JOB_TYPES = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! this is a pretty clean solution, all things considered

Expand All @@ -43,6 +45,7 @@ class SageMakerJobType:
"Stopped": JobStatusClass.STOPPED,
"Stopping": JobStatusClass.STOPPING,
},
processor="process_auto_ml_job",
),
"compilation": SageMakerJobType(
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_compilation_jobs
Expand All @@ -63,6 +66,7 @@ class SageMakerJobType:
"STOPPING": JobStatusClass.STOPPING,
"STOPPED": JobStatusClass.STOPPED,
},
processor="process_compilation_job",
),
"edge_packaging": SageMakerJobType(
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_edge_packaging_jobs
Expand All @@ -83,6 +87,7 @@ class SageMakerJobType:
"STOPPING": JobStatusClass.STOPPING,
"STOPPED": JobStatusClass.STOPPED,
},
processor="process_edge_packaging_job",
),
"hyper_parameter_tuning": SageMakerJobType(
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_hyper_parameter_tuning_jobs
Expand All @@ -102,6 +107,7 @@ class SageMakerJobType:
"Stopping": JobStatusClass.STOPPING,
"Stopped": JobStatusClass.STOPPED,
},
processor="process_hyper_parameter_tuning_job",
),
"labeling": SageMakerJobType(
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_labeling_jobs
Expand All @@ -122,6 +128,7 @@ class SageMakerJobType:
"Stopping": JobStatusClass.STOPPING,
"Stopped": JobStatusClass.STOPPED,
},
processor="process_labeling_job",
),
"processing": SageMakerJobType(
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_processing_jobs
Expand All @@ -141,6 +148,7 @@ class SageMakerJobType:
"Stopping": JobStatusClass.STOPPING,
"Stopped": JobStatusClass.STOPPED,
},
processor="process_processing_job",
),
"training": SageMakerJobType(
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_training_jobs
Expand All @@ -160,6 +168,7 @@ class SageMakerJobType:
"Stopping": JobStatusClass.STOPPING,
"Stopped": JobStatusClass.STOPPED,
},
processor="process_training_job",
),
"transform": SageMakerJobType(
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_transform_jobs
Expand All @@ -179,6 +188,7 @@ class SageMakerJobType:
"Stopping": JobStatusClass.STOPPING,
"Stopped": JobStatusClass.STOPPED,
},
processor="process_transform_job",
),
}

Expand Down Expand Up @@ -213,7 +223,7 @@ def make_sagemaker_job_urn(arn) -> str:

@dataclass
class SageMakerJob:
job: MetadataChangeEventClass
job_mce: MetadataChangeEventClass
input_datasets: Dict[str, Dict[str, Any]] = field(default_factory=dict)
output_datasets: Dict[str, Dict[str, Any]] = field(default_factory=dict)
input_jobs: List[str] = field(default_factory=list)
Expand All @@ -232,35 +242,38 @@ class JobProcessor:

def get_all_jobs(
self,
) -> Tuple[List[Dict[str, Any]], Dict[str, str], Dict[str, str]]:
) -> List[Dict[str, Any]]:
"""
List all jobs in SageMaker.
"""

jobs = []

# dictionaries for translating between type-specific job names and ARNs
arn_to_name: Dict[str, Tuple[str, str]] = {}
name_to_arn: Dict[Tuple[str, str], str] = {}
self.arn_to_name: Dict[str, Tuple[str, str]] = {}
self.name_to_arn: Dict[Tuple[str, str], str] = {}

for job_type, job_spec in SAGEMAKER_JOB_TYPES.items():
# iterate through keys in sorted order for consistency
for job_type in sorted(SAGEMAKER_JOB_TYPES):

paginator = self.sagemaker_client.get_paginator(job_spec["list_command"])
job_spec = SAGEMAKER_JOB_TYPES[job_type]

paginator = self.sagemaker_client.get_paginator(job_spec.list_command)
for page in paginator.paginate():
page_jobs = page[job_spec["list_key"]]
page_jobs = page[job_spec.list_key]

for job in page_jobs:
job_name = (job_type, job_spec)
job_arn = job[job_spec["list_name_arn"]]
job_name = (job_type, job[job_spec.list_name_key])
job_arn = job[job_spec.list_arn_key]

arn_to_name[job_arn] = job_name
name_to_arn[job_name] = job_arn
self.arn_to_name[job_arn] = job_name
self.name_to_arn[job_name] = job_arn

page_jobs = [{**job, "type": job_type} for job in page_jobs]

jobs += page_jobs

return jobs, arn_to_name, name_to_arn
return jobs

def get_job_details(
self, job_name: str, describe_command: str, describe_name_key: str
Expand All @@ -270,6 +283,40 @@ def get_job_details(
**{describe_name_key: job_name}
)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:

jobs = self.get_all_jobs()

for job in jobs:

job_type = SAGEMAKER_JOB_TYPES[job["type"]]
job_name = job[job_type.list_name_key]

job_details = self.get_job_details(
job_name,
job_type.describe_command,
job_type.describe_name_key,
)

processed_job = getattr(self, job_type.processor)(job_details)

job_mce = processed_job.job_mce
job_wu = MetadataWorkUnit(
id=f'{job["type"]}-{job_name}',
mce=job_mce,
)
self.report.report_workunit(job_wu)
yield job_wu

# for feature in feature_group_details["FeatureDefinitions"]:
# wu = self.get_feature_wu(feature_group_details, feature)
# self.report.report_workunit(wu)
# yield wu

# wu = self.get_feature_group_wu(feature_group_details)
# self.report.report_workunit(wu)
# yield wu

def create_common_job_mce(
self,
job: Dict[str, Any],
Expand Down Expand Up @@ -322,15 +369,6 @@ def process_auto_ml_job(self, job) -> SageMakerJob:

JOB_TYPE = "auto_ml"

# TODO: figure out what to do with these attributes
# status: str = job["AutoMLJobStatus"]

# role: str = job["RoleArn"]

# create_time: Optional[datetime] = job.get("CreationTime")
# last_modified_time: Optional[datetime] = job.get("LastModifiedTime")
# end_time: Optional[datetime] = job.get("Endtime")

input_data: Optional[Dict[str, str]] = (
job["InputDataConfig"].get("DataSource", {}).get("S3DataSource")
)
Expand Down Expand Up @@ -360,7 +398,9 @@ def process_auto_ml_job(self, job) -> SageMakerJob:
)

return SageMakerJob(
job=job_mce, input_datasets=input_datasets, output_datasets=output_datasets
job_mce=job_mce,
input_datasets=input_datasets,
output_datasets=output_datasets,
)

def process_compilation_job(self, job) -> SageMakerJob:
Expand All @@ -372,14 +412,6 @@ def process_compilation_job(self, job) -> SageMakerJob:
"""

JOB_TYPE = "compilation"
# status: str = job["CompilationJobStatus"]

# role: str = job["RoleArn"]

# create_time: Optional[datetime] = job.get("CreationTime")
# last_modified_time: Optional[datetime] = job.get("LastModifiedTime")
# start_time: Optional[datetime] = job.get("CompilationStartTime")
# end_time: Optional[datetime] = job.get("CompilationEndTime")

input_datasets = {}

Expand Down Expand Up @@ -411,7 +443,9 @@ def process_compilation_job(self, job) -> SageMakerJob:
)

return SageMakerJob(
job=job_mce, input_datasets=input_datasets, output_datasets=output_datasets
job_mce=job_mce,
input_datasets=input_datasets,
output_datasets=output_datasets,
)

def process_edge_packaging_job(
Expand All @@ -429,13 +463,6 @@ def process_edge_packaging_job(

name: str = job["EdgePackagingJobName"]
arn: str = job["EdgePackagingJobArn"]
# status: str = job["EdgePackagingJobStatus"]
# status_message: str = job["EdgePackagingJobStatusMessage"]

# role: str = job["RoleArn"]

# create_time: Optional[datetime] = job.get("CreationTime")
# last_modified_time: Optional[datetime] = job.get("LastModifiedTime")

output_datasets = {}

Expand Down Expand Up @@ -485,7 +512,7 @@ def process_edge_packaging_job(
)

return SageMakerJob(
job=job_mce, output_datasets=output_datasets, output_jobs=output_jobs
job_mce=job_mce, output_datasets=output_datasets, output_jobs=output_jobs
)

def process_hyper_parameter_tuning_job(
Expand All @@ -503,13 +530,6 @@ def process_hyper_parameter_tuning_job(

name: str = job["HyperParameterTuningJobName"]
arn: str = job["HyperParameterTuningJobArn"]
# status: str = job["HyperParameterTuningJobStatus"]

# role: str = job["RoleArn"]

# create_time: Optional[datetime] = job.get("CreationTime")
# last_modified_time: Optional[datetime] = job.get("LastModifiedTime")
# end_time: Optional[datetime] = job.get("HyperParameterTuningEndTime")

training_jobs = []

Expand All @@ -533,7 +553,7 @@ def process_hyper_parameter_tuning_job(
)

return SageMakerJob(
job=job_mce,
job_mce=job_mce,
output_jobs=training_jobs,
)

Expand All @@ -546,16 +566,6 @@ def process_labeling_job(self, job) -> SageMakerJob:
"""

JOB_TYPE = "labeling"
# status: str = job["LabelingJobStatus"]

# role: str = job["RoleArn"]

# create_time: Optional[datetime] = job.get("CreationTime")
# last_modified_time: Optional[datetime] = job.get("LastModifiedTime")

# attribute: str = job["LabelAttributeName"]

# tags: List[Dict[str, str]] = job["Tags"]

input_datasets = {}

Expand Down Expand Up @@ -602,7 +612,7 @@ def process_labeling_job(self, job) -> SageMakerJob:
)

return SageMakerJob(
job=job_mce,
job_mce=job_mce,
input_datasets=input_datasets,
output_datasets=output_datasets,
)
Expand All @@ -616,14 +626,6 @@ def process_processing_job(self, job) -> SageMakerJob:
"""

JOB_TYPE = "processing"
# status: str = job["ProcessingJobStatus"]

# role: str = job["RoleArn"]

# create_time: Optional[datetime] = job.get("CreationTime")
# last_modified_time: Optional[datetime] = job.get("LastModifiedTime")
# start_time: Optional[datetime] = job.get("ProcessingStartTime")
# end_time: Optional[datetime] = job.get("ProcessingEndTime")

input_jobs = []

Expand Down Expand Up @@ -705,7 +707,7 @@ def process_processing_job(self, job) -> SageMakerJob:
)

return SageMakerJob(
job=job_mce,
job_mce=job_mce,
input_datasets=input_datasets,
input_jobs=input_jobs,
)
Expand All @@ -719,15 +721,6 @@ def process_training_job(self, job) -> SageMakerJob:
"""

JOB_TYPE = "training"
# status: str = job["TrainingJobStatus"]
# secondary_status = job["SecondaryStatus"]

# create_time: Optional[datetime] = job.get("CreationTime")
# last_modified_time: Optional[datetime] = job.get("LastModifiedTime")
# start_time: Optional[datetime] = job.get("TrainingStartTime")
# end_time: Optional[datetime] = job.get("TrainingEndTime")

# hyperparameters = job.get("HyperParameters", {})

input_datasets = {}

Expand Down Expand Up @@ -791,7 +784,7 @@ def process_training_job(self, job) -> SageMakerJob:
)

return SageMakerJob(
job=job_mce,
job_mce=job_mce,
input_datasets=input_datasets,
output_datasets=output_datasets,
)
Expand All @@ -805,12 +798,6 @@ def process_transform_job(self, job) -> SageMakerJob:
"""

JOB_TYPE = "transform"
# status: str = job["TransformJobStatus"]

# create_time: Optional[datetime] = job.get("CreationTime")
# last_modified_time: Optional[datetime] = job.get("LastModifiedTime")
# start_time: Optional[datetime] = job.get("TransformStartTime")
# end_time: Optional[datetime] = job.get("TransformEndTime")

job_input = job.get("TransformInput", {})
input_s3 = job_input.get("DataSource", {}).get("S3DataSource", {})
Expand Down Expand Up @@ -855,7 +842,7 @@ def process_transform_job(self, job) -> SageMakerJob:
)

return SageMakerJob(
job=job_mce,
job_mce=job_mce,
input_datasets=input_datasets,
output_datasets=output_datasets,
input_jobs=input_jobs,
Expand Down
Loading