Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b00cc28
add sub component type
tmonty12 Sep 19, 2025
433c71e
add testing
tmonty12 Sep 19, 2025
11f9e59
planner using prometheus env var endpoint
tmonty12 Sep 22, 2025
8eb0665
planner uses subComponentType and testingt
tmonty12 Sep 23, 2025
ea84c64
use deployment validation error
tmonty12 Sep 23, 2025
0a71b68
add back copyright header
tmonty12 Sep 23, 2025
3ec215d
backwards compatibility with framework component name
tmonty12 Sep 23, 2025
411c02f
update pre deployment profiling to use subComponentType
tmonty12 Sep 24, 2025
c59dd08
small comments
tmonty12 Sep 24, 2025
8e5dfda
update planner manifest to remove prometheus svc and use subComponent…
tmonty12 Sep 24, 2025
69bc1a9
update sla planner deployment docs
tmonty12 Sep 24, 2025
7dc6696
fix doc link
tmonty12 Sep 24, 2025
752b4ac
update profiler config and fix ci
tmonty12 Sep 24, 2025
0761925
small fixes
tmonty12 Sep 24, 2025
51782ff
more small fixes
tmonty12 Sep 24, 2025
3cc6677
revert changes to profiler - will do so in follow on PR
tmonty12 Sep 24, 2025
88b4181
args not optional
tmonty12 Sep 24, 2025
4da6983
small docs update
tmonty12 Sep 24, 2025
20504a2
properly parse prometheus metrics
tmonty12 Sep 26, 2025
3661f5d
fix ci
tmonty12 Sep 26, 2025
74e054c
fix virtual_connector
tmonty12 Sep 26, 2025
823327d
fix mypy
tmonty12 Sep 26, 2025
2c85f2d
remove prometheus server
tmonty12 Sep 26, 2025
0b54bca
pc
tedzhouhk Sep 26, 2025
15d524e
add subComponentType, remove prometheus installation, remove service …
tmonty12 Sep 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix ci
  • Loading branch information
tmonty12 committed Sep 26, 2025
commit 3661f5d7f11c0aa7c892f767862d2b4a84213909
59 changes: 35 additions & 24 deletions components/planner/src/dynamo/planner/kubernetes_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,19 @@

import logging
import os
from enum import Enum
import shlex
from enum import Enum
from typing import Optional

from pydantic import BaseModel

from dynamo.planner.kube import KubernetesAPI
from dynamo.planner.planner_connector import PlannerConnector
from dynamo.planner.utils.exceptions import (
BackendFrameworkInvalidError,
BackendFrameworkNotFoundError,
ComponentError,
DeploymentModelNameMismatchError,
DeploymentValidationError,
DuplicateSubComponentError,
EmptyTargetReplicasError,
DeploymentModelNameMismatchError,
ModelNameNotFoundError,
PlannerError,
SubComponentNotFoundError,
Expand All @@ -53,18 +50,25 @@ class Service(BaseModel):

def number_replicas(self) -> int:
return self.service.get("replicas", 0)

def get_model_name(self) -> Optional[str]:
args = self.service.get("extraPodSpec", {}).get("mainContainer", {}).get("args", [])
args = (
self.service.get("extraPodSpec", {})
.get("mainContainer", {})
.get("args", [])
)

args = break_arguments(args)
if "--served-model-name" in args and len(args) > args.index("--served-model-name") + 1:
if (
"--served-model-name" in args
and len(args) > args.index("--served-model-name") + 1
):
return args[args.index("--served-model-name") + 1]
if "--model" in args and len(args) > args.index("--model") + 1:
return args[args.index("--model") + 1]

return None


def break_arguments(args: list[str] | None) -> list[str]:
ans: list[str] = []
Expand All @@ -88,11 +92,18 @@ class TargetReplica(BaseModel):


class KubernetesConnector(PlannerConnector):
def __init__(self, dynamo_namespace: str, model_name: Optional[str] = None, k8s_namespace: Optional[str] = None):
def __init__(
self,
dynamo_namespace: str,
model_name: Optional[str] = None,
k8s_namespace: Optional[str] = None,
):
self.kube_api = KubernetesAPI(k8s_namespace)

if model_name:
self.model_name = model_name.lower() # normalize model name to lowercase (MDC)
self.model_name = (
model_name.lower()
) # normalize model name to lowercase (MDC)
else:
self.model_name = None

Expand Down Expand Up @@ -190,12 +201,14 @@ async def validate_deployment(
raise DeploymentValidationError(errors)

def get_model_name(self, deployment: Optional[dict] = None) -> str:
"""Get the model name from the deployment"""
"""Get the model name from the deployment"""
try:
if deployment is None:
deployment = self.kube_api.get_graph_deployment(self.graph_deployment_name)
deployment = self.kube_api.get_graph_deployment(
self.graph_deployment_name
)

# TODO: benchmarks/profiler/utils/config.py already contains DGD config parsing
# TODO: benchmarks/profiler/utils/config.py already contains DGD config parsing
# and model name logic, should consolidate
prefill_service = self.get_service_from_sub_component_type_or_name(
deployment,
Expand All @@ -217,13 +230,17 @@ def get_model_name(self, deployment: Optional[dict] = None) -> str:
elif decode_model_name is None:
model_name = prefill_model_name
elif prefill_model_name != decode_model_name:
raise DeploymentModelNameMismatchError(prefill_model_name, decode_model_name)
raise DeploymentModelNameMismatchError(
prefill_model_name, decode_model_name
)
else:
model_name = prefill_model_name

except PlannerError as e:
if self.model_name:
logger.warning(f"Failed to get model name from deployment with error: {e}, using provided model name: {self.model_name}")
logger.warning(
f"Failed to get model name from deployment with error: {e}, using provided model name: {self.model_name}"
)
model_name = self.model_name
else:
raise e
Expand All @@ -232,19 +249,13 @@ def get_model_name(self, deployment: Optional[dict] = None) -> str:
if self.model_name:
if model_name != self.model_name:
raise UserProvidedModelNameMismatchError(model_name, self.model_name)

return model_name

async def wait_for_deployment_ready(
self,
max_attempts: int = 180, # default: 30 minutes total
delay_seconds: int = 10, # default: check every 10 seconds
):
async def wait_for_deployment_ready(self):
"""Wait for the deployment to be ready"""
await self.kube_api.wait_for_graph_deployment_ready(
self.graph_deployment_name,
max_attempts,
delay_seconds,
)

async def set_component_replicas(
Expand Down
7 changes: 5 additions & 2 deletions components/planner/src/dynamo/planner/utils/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class ComponentError(PlannerError):

pass


class ModelNameNotFoundError(PlannerError):
"""Raised when the model name is not found in the deployment"""

Expand All @@ -78,6 +79,7 @@ def __init__(self, prefill_model_name: str, decode_model_name: str):
self.message = message
super().__init__(self.message)


class UserProvidedModelNameMismatchError(PlannerError):
"""Raised when the model name is not the same as the user provided model name"""

Expand All @@ -92,17 +94,18 @@ def __init__(self, model_name: str, user_provided_model_name: str):

class BackendFrameworkNotFoundError(PlannerError):
"""Raised when the backend framework is not supported.

This occurs when the DynamoGraphDeployment contains an unsupported backend framework.
"""

def __init__(self):
message = "Backend framework not found on DynamoGraphDeployment"
super().__init__(message)


class BackendFrameworkInvalidError(PlannerError):
"""Raised when the backend framework does not exist.

This occurs when the DynamoGraphDeployment contains an unsupported backend framework.
"""

Expand Down
14 changes: 10 additions & 4 deletions components/planner/src/dynamo/planner/utils/planner_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,14 @@ def __init__(

if not args.no_operation:
if args.environment == "kubernetes":
self.connector = KubernetesConnector(self.namespace, self.model_name)
self.connector = KubernetesConnector(
self.namespace, self.model_name
)
elif args.environment == "virtual":
self.connector = VirtualConnector(
runtime, self.namespace, args.model_name,
runtime,
self.namespace,
args.model_name,
)
else:
raise ValueError(f"Invalid environment: {args.environment}")
Expand Down Expand Up @@ -246,7 +250,7 @@ async def observe_metrics(self):
self.num_d_workers_gauge.set(len(self.d_endpoints))

self.last_metrics.ttft = self.prometheus_api_client.get_avg_time_to_first_token(
f"{self.args.adjustment_interval}s",
f"{self.args.adjustment_interval}s",
self.model_name,
)
self.last_metrics.itl = self.prometheus_api_client.get_avg_inter_token_latency(
Expand Down Expand Up @@ -484,7 +488,9 @@ async def run(self):

model_name = self.connector.get_model_name()
logger.info(f"Detected model name from deployment: {model_name}")
self.model_name = model_name.lower() # normalize model name to lowercase (MDC)
self.model_name = (
model_name.lower()
) # normalize model name to lowercase (MDC)

self.last_adjustment_time = time.time()

Expand Down
32 changes: 21 additions & 11 deletions components/planner/src/dynamo/planner/utils/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
import logging
import typing

from pydantic import BaseModel, ValidationError
from prometheus_api_client import PrometheusConnect
from pydantic import BaseModel, ValidationError

from dynamo.runtime.logging import configure_dynamo_logging

configure_dynamo_logging()
logger = logging.getLogger(__name__)


class FrontendMetric(BaseModel):
container: typing.Optional[str] = None
dynamo_namespace: typing.Optional[str] = None
Expand All @@ -37,7 +38,7 @@ class FrontendMetric(BaseModel):

class FrontendMetricContainer(BaseModel):
metric: FrontendMetric
value: typing.Tuple[float, float] # [timestamp, value]
value: typing.Tuple[float, float] # [timestamp, value]


class PrometheusAPIClient:
Expand All @@ -46,11 +47,11 @@ def __init__(self, url: str, dynamo_namespace: str):
self.dynamo_namespace = dynamo_namespace

def _get_average_metric(
self,
metric_name: str,
self,
metric_name: str,
interval: str,
operation_name: str,
model_name: str,
operation_name: str,
model_name: str,
) -> float:
"""
Helper method to get average metrics using the pattern:
Expand All @@ -76,13 +77,16 @@ def _get_average_metric(

values = []
for container in metrics_containers:
if container.metric.model == model_name and container.metric.dynamo_namespace == self.dynamo_namespace:
if (
container.metric.model == model_name
and container.metric.dynamo_namespace == self.dynamo_namespace
):
values.append(container.value[1])

if not values:
return 0
return sum(values) / len(values)

except Exception as e:
logger.error(f"Error getting {operation_name}: {e}")
return 0
Expand Down Expand Up @@ -120,7 +124,10 @@ def get_avg_request_count(self, interval: str, model_name: str):
metrics_containers = parse_frontend_metric_containers(raw_res)
total_count = 0.0
for container in metrics_containers:
if container.metric.model == model_name and container.metric.dynamo_namespace == self.dynamo_namespace:
if (
container.metric.model == model_name
and container.metric.dynamo_namespace == self.dynamo_namespace
):
total_count += container.value[1]
return total_count
except Exception as e:
Expand All @@ -143,12 +150,15 @@ def get_avg_output_sequence_tokens(self, interval: str, model_name: str):
model_name,
)

def parse_frontend_metric_containers(result: list[dict]) -> list[FrontendMetricContainer]:

def parse_frontend_metric_containers(
result: list[dict],
) -> list[FrontendMetricContainer]:
metrics_containers: list[FrontendMetricContainer] = []
for res in result:
try:
metrics_containers.append(FrontendMetricContainer.model_validate(res))
except ValidationError as e:
logger.error(f"Error parsing frontend metric container: {e}")
continue
return metrics_containers
return metrics_containers
31 changes: 26 additions & 5 deletions components/planner/src/dynamo/planner/virtual_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import Optional

from dynamo._core import VirtualConnectorCoordinator
from dynamo.planner.defaults import WORKER_COMPONENT_NAMES
from dynamo.planner.planner_connector import PlannerConnector
from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging
Expand All @@ -32,7 +31,10 @@ class VirtualConnector(PlannerConnector):
"""

def __init__(
self, runtime: DistributedRuntime, dynamo_namespace: str, backend: Optional[str]
self,
runtime: DistributedRuntime,
dynamo_namespace: str,
model_name: Optional[str] = None,
):
self.connector = VirtualConnectorCoordinator(
runtime,
Expand All @@ -42,8 +44,11 @@ def __init__(
SCALING_MAX_RETRIES,
)

self.backend = backend
self.worker_component_names = WORKER_COMPONENT_NAMES[backend]
if model_name is None:
raise ValueError("Model name is required for virtual connector")

self.model_name = model_name.lower() # normalize model name to lowercase (MDC)

self.dynamo_namespace = dynamo_namespace

async def _async_init(self):
Expand Down Expand Up @@ -118,4 +123,20 @@ async def set_component_replicas(
)

if blocking:
await self._wait_for_scaling_completion()
await self._wait_for_scaling_completion()

async def validate_deployment(
self,
prefill_component_name: Optional[str] = None,
decode_component_name: Optional[str] = None,
):
"""Validate the deployment"""
pass

async def wait_for_deployment_ready(self):
"""Wait for the deployment to be ready"""
await self._wait_for_scaling_completion()

async def get_model_name(self) -> str:
"""Get the model name from the deployment"""
return self.model_name
Loading
Loading