Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b00cc28
add sub component type
tmonty12 Sep 19, 2025
433c71e
add testing
tmonty12 Sep 19, 2025
11f9e59
planner using prometheus env var endpoint
tmonty12 Sep 22, 2025
8eb0665
planner uses subComponentType and testingt
tmonty12 Sep 23, 2025
ea84c64
use deployment validation error
tmonty12 Sep 23, 2025
0a71b68
add back copyright header
tmonty12 Sep 23, 2025
3ec215d
backwards compatibility with framework component name
tmonty12 Sep 23, 2025
411c02f
update pre deployment profiling to use subComponentType
tmonty12 Sep 24, 2025
c59dd08
small comments
tmonty12 Sep 24, 2025
8e5dfda
update planner manifest to remove prometheus svc and use subComponent…
tmonty12 Sep 24, 2025
69bc1a9
update sla planner deployment docs
tmonty12 Sep 24, 2025
7dc6696
fix doc link
tmonty12 Sep 24, 2025
752b4ac
update profiler config and fix ci
tmonty12 Sep 24, 2025
0761925
small fixes
tmonty12 Sep 24, 2025
51782ff
more small fixes
tmonty12 Sep 24, 2025
3cc6677
revert changes to profiler - will do so in follow on PR
tmonty12 Sep 24, 2025
88b4181
args not optional
tmonty12 Sep 24, 2025
4da6983
small docs update
tmonty12 Sep 24, 2025
20504a2
properly parse prometheus metrics
tmonty12 Sep 26, 2025
3661f5d
fix ci
tmonty12 Sep 26, 2025
74e054c
fix virtual_connector
tmonty12 Sep 26, 2025
823327d
fix mypy
tmonty12 Sep 26, 2025
2c85f2d
remove prometheus server
tmonty12 Sep 26, 2025
0b54bca
pc
tedzhouhk Sep 26, 2025
15d524e
add subComponentType, remove prometheus installation, remove service …
tmonty12 Sep 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
planner uses subComponentType and testingt
Signed-off-by: tmontfort <[email protected]>
  • Loading branch information
tmonty12 committed Sep 26, 2025
commit 8eb06659c828c77f48bfc661c833633e76cc96d6
56 changes: 20 additions & 36 deletions components/planner/src/dynamo/planner/kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@
# limitations under the License.

import asyncio
import os
import logging
from typing import Optional

from kubernetes import client, config
from kubernetes.config.config_exception import ConfigException

from dynamo.planner.utils.exceptions import DynamoGraphDeploymentNotFoundError
from dynamo.runtime.logging import configure_dynamo_logging

configure_dynamo_logging()
logger = logging.getLogger(__name__)


def get_current_k8s_namespace() -> str:
"""Get the current namespace if running inside a k8s cluster"""
Expand Down Expand Up @@ -54,38 +60,24 @@ def _get_graph_deployment_from_name(
name=graph_deployment_name,
)

async def get_parent_graph_deployment(self) -> Optional[dict]:
async def get_graph_deployment(self, graph_deployment_name: str) -> dict:
"""
Get the parent DynamoGraphDeployment using environment variable.

Uses DYN_PARENT_DGD_K8S_NAME environment variable and assumes the DGD
is in the same namespace as this component (self.current_namespace).
Get the parent DynamoGraphDeployment

Returns:
The DynamoGraphDeployment object or None if env var is not set
The DynamoGraphDeployment object

Raises:
DynamoGraphDeploymentNotFoundError: If the parent graph deployment is not found
"""
dgd_name = os.getenv("DYN_PARENT_DGD_K8S_NAME")

if not dgd_name:
return None

try:
return self._get_graph_deployment_from_name(dgd_name)
return self._get_graph_deployment_from_name(graph_deployment_name)
except client.ApiException as e:
if e.status == 404:
return None
raise DynamoGraphDeploymentNotFoundError(deployment_name=graph_deployment_name, namespace=self.current_namespace)
raise

async def get_graph_deployment(self) -> Optional[dict]:
"""
Get the parent DynamoGraphDeployment using environment variable.

Returns:
The DynamoGraphDeployment object or None if env var is not set
"""
return await self.get_parent_graph_deployment()

async def update_graph_replicas(
def update_graph_replicas(
self, graph_deployment_name: str, component_name: str, replicas: int
) -> None:
"""Update the replicas count for a component in a DynamoGraphDeployment"""
Expand All @@ -99,15 +91,10 @@ async def update_graph_replicas(
body=patch,
)

async def is_deployment_ready(self, graph_deployment_name: str) -> bool:
def is_deployment_ready(self, deployment: dict) -> bool:
"""Check if a graph deployment is ready"""

graph_deployment = self._get_graph_deployment_from_name(graph_deployment_name)

if not graph_deployment:
raise ValueError(f"Graph deployment {graph_deployment_name} not found")

conditions = graph_deployment.get("status", {}).get("conditions", [])
conditions = deployment.get("status", {}).get("conditions", [])
ready_condition = next(
(c for c in conditions if c.get("type") == "Ready"), None
)
Expand All @@ -125,13 +112,10 @@ async def wait_for_graph_deployment_ready(
for attempt in range(max_attempts):
await asyncio.sleep(delay_seconds)

graph_deployment = self._get_graph_deployment_from_name(
graph_deployment = await self.get_graph_deployment(
graph_deployment_name
)

if not graph_deployment:
raise ValueError(f"Graph deployment {graph_deployment_name} not found")

conditions = graph_deployment.get("status", {}).get("conditions", [])
ready_condition = next(
(c for c in conditions if c.get("type") == "Ready"), None
Expand All @@ -140,7 +124,7 @@ async def wait_for_graph_deployment_ready(
if ready_condition and ready_condition.get("status") == "True":
return # Deployment is ready

print(
logger.info(
f"[Attempt {attempt + 1}/{max_attempts}] "
f"(status: {ready_condition.get('status') if ready_condition else 'N/A'}, "
f"message: {ready_condition.get('message') if ready_condition else 'no condition found'})"
Expand Down
182 changes: 131 additions & 51 deletions components/planner/src/dynamo/planner/kubernetes_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,103 +14,183 @@
# limitations under the License.

import logging
import os
from typing import Optional

from pydantic import BaseModel

from dynamo.planner.utils.exceptions import (
DeploymentValidationError,
DuplicateSubComponentError,
EmptyTargetReplicasError,
SubComponentConfigurationError,
SubComponentNotFoundError,
)
from dynamo.planner.kube import KubernetesAPI
from dynamo.planner.planner_connector import PlannerConnector
from dynamo.runtime.logging import configure_dynamo_logging

configure_dynamo_logging()
logger = logging.getLogger(__name__)

class Service(BaseModel):
name: str
service: dict

def number_replicas(self) -> int:
return self.service.get("replicas", 0)


class KubernetesConnector(PlannerConnector):
def __init__(self, dynamo_namespace: str, k8s_namespace: Optional[str] = None):
self.kube_api = KubernetesAPI(k8s_namespace)
self.dynamo_namespace = dynamo_namespace

async def add_component(self, component_name: str, blocking: bool = True):
graph_deployment_name = os.getenv("DYN_PARENT_DGD_K8S_NAME")
if not graph_deployment_name:
raise ValueError("DYN_PARENT_DGD_K8S_NAME environment variable is not set")

self.graph_deployment_name = graph_deployment_name

async def add_component(self, sub_component_type: str, blocking: bool = True):
"""Add a component by increasing its replica count by 1"""

deployment = await self.kube_api.get_graph_deployment()
if deployment is None:
raise ValueError("Parent DynamoGraphDeployment not found")
deployment = await self.kube_api.get_graph_deployment(self.graph_deployment_name)

# get current replicas or 1 if not found
current_replicas = self._get_current_replicas(deployment, component_name)
await self.kube_api.update_graph_replicas(
self._get_graph_deployment_name(deployment),
component_name,
current_replicas + 1,
service = self.get_service_from_sub_component_type(deployment, sub_component_type)
self.kube_api.update_graph_replicas(
self.graph_deployment_name,
service.name,
service.number_replicas() + 1,
)
if blocking:
await self.kube_api.wait_for_graph_deployment_ready(
self._get_graph_deployment_name(deployment)
self.graph_deployment_name,
)

async def remove_component(self, component_name: str, blocking: bool = True):
async def remove_component(self, sub_component_type: str, blocking: bool = True):
"""Remove a component by decreasing its replica count by 1"""

deployment = await self.kube_api.get_graph_deployment()
if deployment is None:
raise ValueError("Parent DynamoGraphDeployment not found")

# get current replicas or 1 if not found
current_replicas = self._get_current_replicas(deployment, component_name)
if current_replicas > 0:
await self.kube_api.update_graph_replicas(
self._get_graph_deployment_name(deployment),
component_name,
current_replicas - 1,
deployment = await self.kube_api.get_graph_deployment(self.graph_deployment_name)

service = self.get_service_from_sub_component_type(deployment, sub_component_type)
if service.number_replicas() > 0:
self.kube_api.update_graph_replicas(
self.graph_deployment_name,
service.name,
service.number_replicas() - 1,
)
if blocking:
await self.kube_api.wait_for_graph_deployment_ready(
self._get_graph_deployment_name(deployment)
self.graph_deployment_name,
)

async def verify_prefill_and_decode_components_exist(self):
"""
Verify that the deployment contains services with subComponentType prefill and decode

Raises:
DeploymentValidationError: If the deployment does not contain services with subComponentType prefill and decode
"""
deployment = await self.kube_api.get_graph_deployment(self.graph_deployment_name)

errors = []

try:
self.get_service_from_sub_component_type(deployment, "prefill")
except (SubComponentConfigurationError) as e:
errors.append(str(e))

try:
self.get_service_from_sub_component_type(deployment, "decode")
except (SubComponentConfigurationError) as e:
errors.append(str(e))

# Raise combined error if any issues found
if errors:
raise DeploymentValidationError(errors)

async def wait_for_deployment_ready(
self,
max_attempts: int = 180, # default: 30 minutes total
delay_seconds: int = 10, # default: check every 10 seconds
):
"""Wait for the deployment to be ready"""
await self.kube_api.wait_for_graph_deployment_ready(
self.graph_deployment_name,
max_attempts,
delay_seconds,
)

async def set_component_replicas(
self, target_replicas: dict[str, int], blocking: bool = True
):
"""Set the replicas for multiple components at once"""
if not target_replicas:
raise ValueError("target_replicas cannot be empty")
raise EmptyTargetReplicasError()

deployment = await self.kube_api.get_graph_deployment()
if deployment is None:
raise ValueError("Parent DynamoGraphDeployment not found")
deployment = await self.kube_api.get_graph_deployment(self.graph_deployment_name)

if not await self.kube_api.is_deployment_ready(
self._get_graph_deployment_name(deployment)
):
if not self.kube_api.is_deployment_ready(deployment):
logger.warning(
f"Deployment {self._get_graph_deployment_name(deployment)} is not ready, ignoring this scaling"
f"Deployment {self.graph_deployment_name} is not ready, ignoring this scaling"
)
return

for component_name, replicas in target_replicas.items():
await self.kube_api.update_graph_replicas(
self._get_graph_deployment_name(deployment),
component_name,
replicas,
)
for sub_component_type, replicas in target_replicas.items():
service = self.get_service_from_sub_component_type(deployment, sub_component_type)
current_replicas = service.number_replicas()
if current_replicas != replicas:
logger.info(f"Updating {sub_component_type} component {service.name} to desired replica count {replicas}")
self.kube_api.update_graph_replicas(
self.graph_deployment_name,
service.name,
replicas,
)
else:
logger.info(f"{sub_component_type} component {service.name} already at desired replica count {replicas}, skipping")

if blocking:
await self.kube_api.wait_for_graph_deployment_ready(
self._get_graph_deployment_name(deployment)
self.graph_deployment_name,
)

def _get_current_replicas(self, deployment: dict, component_name: str) -> int:
"""Get the current replicas for a component in a graph deployment"""
return (
deployment.get("spec", {})
.get("services", {})
.get(component_name, {})
.get("replicas", 1)
)

def _get_graph_deployment_name(self, deployment: dict) -> str:
"""Get the name of the graph deployment"""
return deployment["metadata"]["name"]

def get_service_from_sub_component_type(self, deployment: dict, sub_component_type: str) -> Service:
"""
Get the current replicas for a component in a graph deployment

Returns: Service object

Raises:
SubComponentNotFoundError: If no service with the specified subComponentType is found
DuplicateSubComponentError: If multiple services with the same subComponentType are found
"""
services = deployment.get("spec", {}).get("services", {})

# Collect all available subComponentTypes for better error messages
available_types = []
matching_services = []

for curr_name, curr_service in services.items():
service_sub_type = curr_service.get("subComponentType", "")
if service_sub_type:
available_types.append(service_sub_type)

if service_sub_type == sub_component_type:
matching_services.append((curr_name, curr_service))

# Check for duplicates
if len(matching_services) > 1:
service_names = [name for name, _ in matching_services]
raise DuplicateSubComponentError(sub_component_type, service_names)

# Check if not found
if not matching_services:
raise SubComponentNotFoundError(sub_component_type)

name, service = matching_services[0]
return Service(name=name, service=service)


if __name__ == "__main__":
Expand Down
Loading