Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b00cc28
add sub component type
tmonty12 Sep 19, 2025
433c71e
add testing
tmonty12 Sep 19, 2025
11f9e59
planner using prometheus env var endpoint
tmonty12 Sep 22, 2025
8eb0665
planner uses subComponentType and testingt
tmonty12 Sep 23, 2025
ea84c64
use deployment validation error
tmonty12 Sep 23, 2025
0a71b68
add back copyright header
tmonty12 Sep 23, 2025
3ec215d
backwards compatibility with framework component name
tmonty12 Sep 23, 2025
411c02f
update pre deployment profiling to use subComponentType
tmonty12 Sep 24, 2025
c59dd08
small comments
tmonty12 Sep 24, 2025
8e5dfda
update planner manifest to remove prometheus svc and use subComponent…
tmonty12 Sep 24, 2025
69bc1a9
update sla planner deployment docs
tmonty12 Sep 24, 2025
7dc6696
fix doc link
tmonty12 Sep 24, 2025
752b4ac
update profiler config and fix ci
tmonty12 Sep 24, 2025
0761925
small fixes
tmonty12 Sep 24, 2025
51782ff
more small fixes
tmonty12 Sep 24, 2025
3cc6677
revert changes to profiler - will do so in follow on PR
tmonty12 Sep 24, 2025
88b4181
args not optional
tmonty12 Sep 24, 2025
4da6983
small docs update
tmonty12 Sep 24, 2025
20504a2
properly parse prometheus metrics
tmonty12 Sep 26, 2025
3661f5d
fix ci
tmonty12 Sep 26, 2025
74e054c
fix virtual_connector
tmonty12 Sep 26, 2025
823327d
fix mypy
tmonty12 Sep 26, 2025
2c85f2d
remove prometheus server
tmonty12 Sep 26, 2025
0b54bca
pc
tedzhouhk Sep 26, 2025
15d524e
add subComponentType, remove prometheus installation, remove service …
tmonty12 Sep 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
properly parse prometheus metrics
Signed-off-by: tmontfort <[email protected]>
  • Loading branch information
tmonty12 committed Sep 26, 2025
commit 20504a248acf6f401505c3654211994261c6ebfb
101 changes: 95 additions & 6 deletions components/planner/src/dynamo/planner/kubernetes_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,25 @@
import logging
import os
from enum import Enum
import shlex
from typing import Optional

from pydantic import BaseModel

from dynamo.planner.kube import KubernetesAPI
from dynamo.planner.planner_connector import PlannerConnector
from dynamo.planner.utils.exceptions import (
BackendFrameworkInvalidError,
BackendFrameworkNotFoundError,
ComponentError,
DeploymentValidationError,
DuplicateSubComponentError,
EmptyTargetReplicasError,
DeploymentModelNameMismatchError,
ModelNameNotFoundError,
PlannerError,
SubComponentNotFoundError,
UserProvidedModelNameMismatchError,
)
from dynamo.runtime.logging import configure_dynamo_logging

Expand All @@ -46,6 +53,32 @@ class Service(BaseModel):

def number_replicas(self) -> int:
return self.service.get("replicas", 0)

def get_model_name(self) -> Optional[str]:
args = self.service.get("extraPodSpec", {}).get("mainContainer", {}).get("args", [])

args = break_arguments(args)
if "--served-model-name" in args and len(args) > args.index("--served-model-name") + 1:
return args[args.index("--served-model-name") + 1]
if "--model" in args and len(args) > args.index("--model") + 1:
return args[args.index("--model") + 1]

return None


def break_arguments(args: list[str] | None) -> list[str]:
ans: list[str] = []
if args is None:
return ans
if isinstance(args, str):
# Use shlex.split to properly handle quoted arguments and JSON values
ans = shlex.split(args)
else:
for arg in args:
if arg is not None:
# Use shlex.split to properly handle quoted arguments
ans.extend(shlex.split(arg))
return ans


class TargetReplica(BaseModel):
Expand All @@ -55,9 +88,13 @@ class TargetReplica(BaseModel):


class KubernetesConnector(PlannerConnector):
def __init__(self, dynamo_namespace: str, k8s_namespace: Optional[str] = None):
def __init__(self, dynamo_namespace: str, model_name: Optional[str] = None, k8s_namespace: Optional[str] = None):
self.kube_api = KubernetesAPI(k8s_namespace)
self.dynamo_namespace = dynamo_namespace

if model_name:
self.model_name = model_name.lower() # normalize model name to lowercase (MDC)
else:
self.model_name = None

graph_deployment_name = os.getenv("DYN_PARENT_DGD_K8S_NAME")
if not graph_deployment_name:
Expand Down Expand Up @@ -108,16 +145,17 @@ async def remove_component(
self.graph_deployment_name,
)

async def verify_prefill_and_decode_components_exist(
async def validate_deployment(
self,
prefill_component_name: Optional[str] = None,
decode_component_name: Optional[str] = None,
):
"""
Verify that the deployment contains services with subComponentType prefill and decode.
Verify that the deployment contains services with subComponentType prefill and decode and the model name exists.
Will fallback to worker service names for backwards compatibility. (TODO: deprecate)

Raises:
DynamoGraphDeploymentNotFoundError: If the deployment is not found
DeploymentValidationError: If the deployment does not contain services with subComponentType prefill and decode
"""
deployment = self.kube_api.get_graph_deployment(self.graph_deployment_name)
Expand All @@ -130,7 +168,7 @@ async def verify_prefill_and_decode_components_exist(
SubComponentType.PREFILL,
component_name=prefill_component_name,
)
except ComponentError as e:
except PlannerError as e:
errors.append(str(e))

try:
Expand All @@ -139,13 +177,64 @@ async def verify_prefill_and_decode_components_exist(
SubComponentType.DECODE,
component_name=decode_component_name,
)
except ComponentError as e:
except PlannerError as e:
errors.append(str(e))

try:
self.get_model_name(deployment)
except PlannerError as e:
errors.append(str(e))

# Raise combined error if any issues found
if errors:
raise DeploymentValidationError(errors)

def get_model_name(self, deployment: Optional[dict] = None) -> str:
"""Get the model name from the deployment"""
try:
if deployment is None:
deployment = self.kube_api.get_graph_deployment(self.graph_deployment_name)

# TODO: benchmarks/profiler/utils/config.py already contains DGD config parsing
# and model name logic, should consolidate
prefill_service = self.get_service_from_sub_component_type_or_name(
deployment,
SubComponentType.PREFILL,
)
decode_service = self.get_service_from_sub_component_type_or_name(
deployment,
SubComponentType.DECODE,
)
prefill_model_name = prefill_service.get_model_name()
decode_model_name = decode_service.get_model_name()

if prefill_model_name is None and decode_model_name is None:
raise ModelNameNotFoundError()

# Check model name between prefill and decode
if prefill_model_name is None:
model_name = decode_model_name
elif decode_model_name is None:
model_name = prefill_model_name
elif prefill_model_name != decode_model_name:
raise DeploymentModelNameMismatchError(prefill_model_name, decode_model_name)
else:
model_name = prefill_model_name

except PlannerError as e:
if self.model_name:
logger.warning(f"Failed to get model name from deployment with error: {e}, using provided model name: {self.model_name}")
model_name = self.model_name
else:
raise e

# If user provided a model name and it doesn't match the model name from the deployment, raise an error
if self.model_name:
if model_name != self.model_name:
raise UserProvidedModelNameMismatchError(model_name, self.model_name)

return model_name

async def wait_for_deployment_ready(
self,
max_attempts: int = 180, # default: 30 minutes total
Expand Down
53 changes: 53 additions & 0 deletions components/planner/src/dynamo/planner/utils/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,59 @@ class ComponentError(PlannerError):

pass

class ModelNameNotFoundError(PlannerError):
"""Raised when the model name is not found in the deployment"""

def __init__(self):
message = "Model name not found in DynamoGraphDeployment"
super().__init__(message)


class DeploymentModelNameMismatchError(PlannerError):
"""Raised when the model name is not the same in the deployment"""

def __init__(self, prefill_model_name: str, decode_model_name: str):
self.prefill_model_name = prefill_model_name
self.decode_model_name = decode_model_name

message = f"Model name mismatch in DynamoGraphDeployment: prefill model name {prefill_model_name} != decode model name {decode_model_name}"
self.message = message
super().__init__(self.message)

class UserProvidedModelNameMismatchError(PlannerError):
"""Raised when the model name is not the same as the user provided model name"""

def __init__(self, model_name: str, user_provided_model_name: str):
self.model_name = model_name
self.user_provided_model_name = user_provided_model_name

message = f"Model name {model_name} does not match expected model name {user_provided_model_name}"
self.message = message
super().__init__(self.message)


class BackendFrameworkNotFoundError(PlannerError):
"""Raised when the backend framework is not supported.

This occurs when the DynamoGraphDeployment contains an unsupported backend framework.
"""

def __init__(self):
message = "Backend framework not found on DynamoGraphDeployment"
super().__init__(message)

class BackendFrameworkInvalidError(PlannerError):
"""Raised when the backend framework does not exist.

This occurs when the DynamoGraphDeployment contains an unsupported backend framework.
"""

def __init__(self, backend_framework: str):
self.backend_framework = backend_framework

message = f"Backend framework {backend_framework} is invalid"
super().__init__(message)


class SubComponentNotFoundError(ComponentError):
"""Raised when a required subComponentType is not found in the deployment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,9 @@ def create_sla_planner_parser() -> argparse.ArgumentParser:
default=SLAPlannerDefaults.no_correction,
help="Disable correction factor",
)
parser.add_argument(
"--model-name",
type=str,
help="Model name of deployment (only required for virtual environment)",
)
return parser
40 changes: 27 additions & 13 deletions components/planner/src/dynamo/planner/utils/planner_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,26 @@ def __init__(
self.args = args
self.dryrun = dryrun

# Rely on getting model name from connector
self.model_name: Optional[str] = None

if not self.dryrun:
self.runtime = runtime
self.namespace = args.namespace

if not args.no_operation:
if args.environment == "kubernetes":
self.connector = KubernetesConnector(self.namespace)
self.connector = KubernetesConnector(self.namespace, self.model_name)
elif args.environment == "virtual":
self.connector = VirtualConnector(
runtime, self.namespace, args.backend
runtime, self.namespace, args.model_name,
)
else:
raise ValueError(f"Invalid environment: {args.environment}")

self.prometheus_api_client = PrometheusAPIClient(
SLAPlannerDefaults.prometheus_endpoint
SLAPlannerDefaults.prometheus_endpoint,
args.namespace,
)

self.num_req_predictor = LOAD_PREDICTORS[args.load_predictor](
Expand Down Expand Up @@ -242,27 +246,33 @@ async def observe_metrics(self):
self.num_d_workers_gauge.set(len(self.d_endpoints))

self.last_metrics.ttft = self.prometheus_api_client.get_avg_time_to_first_token(
f"{self.args.adjustment_interval}s"
f"{self.args.adjustment_interval}s",
self.model_name,
)
self.last_metrics.itl = self.prometheus_api_client.get_avg_inter_token_latency(
f"{self.args.adjustment_interval}s"
f"{self.args.adjustment_interval}s",
self.model_name,
)
self.last_metrics.num_req = self.prometheus_api_client.get_avg_request_count(
f"{self.args.adjustment_interval}s"
f"{self.args.adjustment_interval}s",
self.model_name,
)
self.last_metrics.request_duration = (
self.prometheus_api_client.get_avg_request_duration(
f"{self.args.adjustment_interval}s"
f"{self.args.adjustment_interval}s",
self.model_name,
)
)
self.last_metrics.isl = (
self.prometheus_api_client.get_avg_input_sequence_tokens(
f"{self.args.adjustment_interval}s"
f"{self.args.adjustment_interval}s",
self.model_name,
)
)
self.last_metrics.osl = (
self.prometheus_api_client.get_avg_output_sequence_tokens(
f"{self.args.adjustment_interval}s"
f"{self.args.adjustment_interval}s",
self.model_name,
)
)

Expand Down Expand Up @@ -459,19 +469,23 @@ async def run(self):
"""Main loop for the planner"""

if not self.args.no_operation:
# Fail fast if the deployment does not contain prefill and decode components
logger.info("Verifying prefill and decode components exist...")
# Fail fast if the deployment is not valid
logger.info("Validating deployment...")

# TODO: still supporting framework component names for backwards compatibility
# Should be deprecated in favor of service subComponentType
await self.connector.verify_prefill_and_decode_components_exist(
await self.connector.validate_deployment(
prefill_component_name=self.prefill_component_name,
decode_component_name=self.decode_component_name,
)
logger.info("Successfully verified prefill and decode components exist")
logger.info("Successfully validated the deployment")

await self.connector.wait_for_deployment_ready()

model_name = self.connector.get_model_name()
logger.info(f"Detected model name from deployment: {model_name}")
self.model_name = model_name.lower() # normalize model name to lowercase (MDC)

self.last_adjustment_time = time.time()

while True:
Expand Down
Loading
Loading