Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions deploy/cloud/helm/platform/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ maintainers:
url: https://www.nvidia.com
description: A Helm chart for NVIDIA Dynamo Platform.
type: application
version: 0.4.0
version: 0.4.1
home: https://nvidia.com
dependencies:
- name: dynamo-operator
version: 0.4.0
version: 0.4.1
repository: file://components/operator
condition: dynamo-operator.enabled
- name: nats
Expand Down
4 changes: 2 additions & 2 deletions deploy/cloud/helm/platform/components/operator/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.4.0
version: 0.4.1
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "0.4.0"
appVersion: "0.4.1"
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ rules:
- patch
- update
- watch
- apiGroups:
- grove.io
resources:
- podcliques/scale
- podcliquescalinggroups/scale
verbs:
- get
- patch
- update
- apiGroups:
- apps
resources:
Expand Down
38 changes: 38 additions & 0 deletions deploy/cloud/operator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/scale"
k8sCache "k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache"

Expand Down Expand Up @@ -65,6 +69,32 @@ var (
setupLog = ctrl.Log.WithName("setup")
)

func createScalesGetter(mgr ctrl.Manager) (scale.ScalesGetter, error) {
config := mgr.GetConfig()

// Create kubernetes client for discovery
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}

// Create cached discovery client
cachedDiscovery := memory.NewMemCacheClient(kubeClient.Discovery())

// Create REST mapper
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscovery)

// Use scale.New with the kubernetes client's REST client
scalesGetter := scale.New(
kubeClient.CoreV1().RESTClient(),
restMapper,
dynamic.LegacyAPIPathResolverFunc,
scale.NewDiscoveryScaleKindResolver(cachedDiscovery),
)

return scalesGetter, nil
}

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))

Expand Down Expand Up @@ -321,11 +351,19 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "DynamoComponentDeployment")
os.Exit(1)
}
// Create scale client for Grove resource scaling
scaleClient, err := createScalesGetter(mgr)
if err != nil {
setupLog.Error(err, "unable to create scale client")
os.Exit(1)
}

if err = (&controller.DynamoGraphDeploymentReconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("dynamographdeployment"),
Config: ctrlConfig,
DockerSecretRetriever: dockerSecretRetriever,
ScaleClient: scaleClient,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DynamoGraphDeployment")
os.Exit(1)
Expand Down
9 changes: 9 additions & 0 deletions deploy/cloud/operator/config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ rules:
- patch
- update
- watch
- apiGroups:
- grove.io
resources:
- podcliques/scale
- podcliquescalinggroups/scale
verbs:
- get
- patch
- update
- apiGroups:
- grove.io
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ package controller
import (
"context"
"fmt"
"strings"

grovev1alpha1 "github.com/NVIDIA/grove/operator/api/core/v1alpha1"
networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/scale"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand All @@ -50,6 +53,20 @@ const (
PendingState State = "pending"
)

var (
// Grove GroupVersionResources for scaling operations
podCliqueGVR = schema.GroupVersionResource{
Group: "grove.io",
Version: "v1alpha1",
Resource: "podcliques",
}
podCliqueScalingGroupGVR = schema.GroupVersionResource{
Group: "grove.io",
Version: "v1alpha1",
Resource: "podcliquescalinggroups",
}
)

type etcdStorage interface {
DeleteKeys(ctx context.Context, prefix string) error
}
Expand All @@ -60,12 +77,15 @@ type DynamoGraphDeploymentReconciler struct {
Config commonController.Config
Recorder record.EventRecorder
DockerSecretRetriever dockerSecretRetriever
ScaleClient scale.ScalesGetter
}

// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/finalizers,verbs=update
// +kubebuilder:rbac:groups=grove.io,resources=podgangsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=grove.io,resources=podcliques/scale,verbs=get;update;patch
// +kubebuilder:rbac:groups=grove.io,resources=podcliquescalinggroups/scale,verbs=get;update;patch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand Down Expand Up @@ -156,6 +176,71 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context

}

// scaleGroveResource scales a Grove resource using the generic scaling function
func (r *DynamoGraphDeploymentReconciler) scaleGroveResource(ctx context.Context, resourceName, namespace string, newReplicas int32, resourceType string) error {
// Determine the GroupVersionResource based on resource type
var gvr schema.GroupVersionResource
switch resourceType {
case "PodClique":
gvr = podCliqueGVR
case "PodCliqueScalingGroup":
gvr = podCliqueScalingGroupGVR
default:
return fmt.Errorf("unsupported Grove resource type: %s", resourceType)
}

// Use the generic scaling function
return commonController.ScaleResource(ctx, r.ScaleClient, gvr, namespace, resourceName, newReplicas)
}

// reconcileGroveScaling handles scaling operations for Grove resources based on service replica changes
func (r *DynamoGraphDeploymentReconciler) reconcileGroveScaling(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
logger := log.FromContext(ctx)
logger.V(1).Info("Reconciling Grove scaling operations")

replicaIndex := 0
for serviceName, component := range dynamoDeployment.Spec.Services {
// Skip if replicas are not specified
if component.Replicas == nil {
continue
}

numberOfNodes := component.GetNumberOfNodes()
isMultinode := numberOfNodes > 1

if isMultinode {
// Scale PodCliqueScalingGroup for multinode services
// Grove naming pattern: {DGD.name}-{replicaIndex}-{serviceName}
resourceName := fmt.Sprintf("%s-%d-%s", dynamoDeployment.Name, replicaIndex, strings.ToLower(serviceName))
err := r.scaleGroveResource(ctx,
resourceName,
dynamoDeployment.Namespace,
*component.Replicas,
"PodCliqueScalingGroup")
if err != nil {
logger.Error(err, "Failed to scale PodCliqueScalingGroup", "serviceName", serviceName, "resourceName", resourceName, "replicas", *component.Replicas)
return fmt.Errorf("failed to scale PodCliqueScalingGroup %s: %w", resourceName, err)
}
} else {
// Scale individual PodClique for single-node services
// Grove naming pattern: {DGD.name}-{replicaIndex}-{serviceName}
resourceName := fmt.Sprintf("%s-%d-%s", dynamoDeployment.Name, replicaIndex, strings.ToLower(serviceName))
err := r.scaleGroveResource(ctx,
resourceName,
dynamoDeployment.Namespace,
*component.Replicas,
"PodClique")
if err != nil {
logger.Error(err, "Failed to scale PodClique", "serviceName", serviceName, "resourceName", resourceName, "replicas", *component.Replicas)
return fmt.Errorf("failed to scale PodClique %s: %w", resourceName, err)
}
}
}

logger.V(1).Info("Successfully reconciled Grove scaling operations")
return nil
}

func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (State, Reason, Message, error) {
logger := log.FromContext(ctx)
// generate the dynamoComponentsDeployments from the config
Expand All @@ -177,6 +262,13 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
}
return false
})

// Handle Grove scaling operations after structural changes
if err := r.reconcileGroveScaling(ctx, dynamoDeployment); err != nil {
logger.Error(err, "failed to reconcile Grove scaling")
return FailedState, "grove_scaling_failed", Message(err.Error()), err
}

resources := []Resource{groveGangSetAsResource}
for componentName, component := range dynamoDeployment.Spec.Services {
if component.ComponentType == consts.ComponentTypeFrontend {
Expand All @@ -203,10 +295,6 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
ingressSpec = *component.Ingress
}
mainComponentIngress := dynamo.GenerateComponentIngress(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec)
if err != nil {
logger.Error(err, "failed to generate the main component ingress")
return "", "", "", fmt.Errorf("failed to generate the main component ingress: %w", err)
}
_, syncedMainComponentIngress, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1.Ingress, bool, error) {
if !ingressSpec.Enabled || ingressSpec.IngressControllerClassName == nil {
logger.Info("Ingress is not enabled")
Expand All @@ -224,10 +312,6 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
// generate the main component virtual service
if r.Config.IngressConfig.UseVirtualService() {
mainComponentVirtualService := dynamo.GenerateComponentVirtualService(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec)
if err != nil {
logger.Error(err, "failed to generate the main component virtual service")
return "", "", "", fmt.Errorf("failed to generate the main component virtual service: %w", err)
}
_, syncedMainComponentVirtualService, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1beta1.VirtualService, bool, error) {
if !ingressSpec.IsVirtualServiceEnabled() {
logger.Info("VirtualService is not enabled")
Expand Down
77 changes: 77 additions & 0 deletions deploy/cloud/operator/internal/controller_common/scale.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package controller_common

import (
"context"
"fmt"

autoscalingv1 "k8s.io/api/autoscaling/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/scale"
"sigs.k8s.io/controller-runtime/pkg/log"
)

// ScaleResource scales any Kubernetes resource using the Scale subresource
func ScaleResource(ctx context.Context, scaleClient scale.ScalesGetter, gvr schema.GroupVersionResource, namespace, name string, replicas int32) error {
logger := log.FromContext(ctx)
logger.Info("Scaling resource", "gvr", gvr, "name", name, "namespace", namespace, "replicas", replicas)

if scaleClient == nil {
logger.Error(nil, "Scale client is nil")
return fmt.Errorf("scale client is nil")
}

currentScale, err := scaleClient.Scales(namespace).Get(ctx, gvr.GroupResource(), name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
// Resource doesn't exist yet - this is normal during initial creation when Grove is still creating the resources asynchronously
logger.V(1).Info("Grove resource not found yet, skipping scaling for now - will retry on next reconciliation", "gvr", gvr, "name", name, "namespace", namespace)
return nil
}
logger.Error(err, "Failed to get current scale - resource may not support scale subresource", "gvr", gvr, "name", name, "namespace", namespace, "groupResource", gvr.GroupResource())
return fmt.Errorf("failed to get current scale for %s %s (resource may not support scale subresource): %w", gvr.Resource, name, err)
}

if currentScale.Spec.Replicas == replicas {
logger.V(1).Info("Resource already at desired replica count", "gvr", gvr, "name", name, "replicas", replicas)
return nil
}

scaleObj := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: autoscalingv1.ScaleSpec{
Replicas: replicas,
},
}

logger.V(1).Info("Updating scale", "gvr", gvr, "name", name, "newReplicas", replicas)
_, err = scaleClient.Scales(namespace).Update(ctx, gvr.GroupResource(), scaleObj, metav1.UpdateOptions{})
if err != nil {
logger.Error(err, "Failed to update scale", "gvr", gvr, "name", name, "replicas", replicas)
return fmt.Errorf("failed to update scale for %s %s: %w", gvr.Resource, name, err)
}

logger.Info("Successfully scaled resource", "gvr", gvr, "name", name, "oldReplicas", currentScale.Spec.Replicas, "newReplicas", replicas)
return nil
}
4 changes: 2 additions & 2 deletions deploy/helm/chart/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ apiVersion: v2
name: dynamo-graph
description: A Helm chart to deploy a Dynamo graph on Kubernetes
type: application
version: 0.4.0
appVersion: 0.4.0
version: 0.4.1
appVersion: 0.4.1
Loading