Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions deploy/cloud/helm/platform/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ maintainers:
url: https://www.nvidia.com
description: A Helm chart for NVIDIA Dynamo Platform.
type: application
version: 0.4.0
version: 0.4.1
home: https://nvidia.com
dependencies:
- name: dynamo-operator
version: 0.4.0
version: 0.4.1
repository: file://components/operator
condition: dynamo-operator.enabled
- name: nats
Expand Down
4 changes: 2 additions & 2 deletions deploy/cloud/helm/platform/components/operator/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.4.0
version: 0.4.1
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "0.4.0"
appVersion: "0.4.1"
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ rules:
- patch
- update
- watch
- apiGroups:
- grove.io
resources:
- podcliques/scale
- podcliquescalinggroups/scale
verbs:
- get
- patch
- update
- apiGroups:
- apps
resources:
Expand Down
40 changes: 40 additions & 0 deletions deploy/cloud/operator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/scale"
k8sCache "k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache"

Expand Down Expand Up @@ -65,6 +69,34 @@ var (
setupLog = ctrl.Log.WithName("setup")
)

func createScalesGetter(mgr ctrl.Manager) (scale.ScalesGetter, error) {
config := mgr.GetConfig()

// Create kubernetes client for discovery
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}

// Create cached discovery client
cachedDiscovery := memory.NewMemCacheClient(kubeClient.Discovery())

// Create REST mapper
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscovery)

scalesGetter, err := scale.NewForConfig(
config,
restMapper,
dynamic.LegacyAPIPathResolverFunc,
scale.NewDiscoveryScaleKindResolver(cachedDiscovery),
)
if err != nil {
return nil, err
}

return scalesGetter, nil
}

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))

Expand Down Expand Up @@ -321,11 +353,19 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "DynamoComponentDeployment")
os.Exit(1)
}
// Create scale client for Grove resource scaling
scaleClient, err := createScalesGetter(mgr)
if err != nil {
setupLog.Error(err, "unable to create scale client")
os.Exit(1)
}

if err = (&controller.DynamoGraphDeploymentReconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("dynamographdeployment"),
Config: ctrlConfig,
DockerSecretRetriever: dockerSecretRetriever,
ScaleClient: scaleClient,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DynamoGraphDeployment")
os.Exit(1)
Expand Down
9 changes: 9 additions & 0 deletions deploy/cloud/operator/config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ rules:
- patch
- update
- watch
- apiGroups:
- grove.io
resources:
- podcliques/scale
- podcliquescalinggroups/scale
verbs:
- get
- patch
- update
- apiGroups:
- grove.io
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ package controller
import (
"context"
"fmt"
"strings"

grovev1alpha1 "github.com/NVIDIA/grove/operator/api/core/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"

networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/scale"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand All @@ -50,6 +55,20 @@ const (
PendingState State = "pending"
)

var (
// Grove GroupVersionResources for scaling operations
podCliqueGVR = schema.GroupVersionResource{
Group: "grove.io",
Version: "v1alpha1",
Resource: "podcliques",
}
podCliqueScalingGroupGVR = schema.GroupVersionResource{
Group: "grove.io",
Version: "v1alpha1",
Resource: "podcliquescalinggroups",
}
)

type etcdStorage interface {
DeleteKeys(ctx context.Context, prefix string) error
}
Expand All @@ -60,12 +79,15 @@ type DynamoGraphDeploymentReconciler struct {
Config commonController.Config
Recorder record.EventRecorder
DockerSecretRetriever dockerSecretRetriever
ScaleClient scale.ScalesGetter
}

// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/finalizers,verbs=update
// +kubebuilder:rbac:groups=grove.io,resources=podgangsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=grove.io,resources=podcliques/scale,verbs=get;update;patch
// +kubebuilder:rbac:groups=grove.io,resources=podcliquescalinggroups/scale,verbs=get;update;patch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand Down Expand Up @@ -156,6 +178,80 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context

}

// scaleGroveResource scales a Grove resource using the generic scaling function
func (r *DynamoGraphDeploymentReconciler) scaleGroveResource(ctx context.Context, resourceName, namespace string, newReplicas int32, resourceType string) error {
logger := log.FromContext(ctx)
// Determine the GroupVersionResource based on resource type
var gvr schema.GroupVersionResource
switch resourceType {
case "PodClique":
gvr = podCliqueGVR
case "PodCliqueScalingGroup":
gvr = podCliqueScalingGroupGVR
default:
return fmt.Errorf("unsupported Grove resource type: %s", resourceType)
}

// Use the generic scaling function
err := commonController.ScaleResource(ctx, r.ScaleClient, gvr, namespace, resourceName, newReplicas)
if err != nil {
if errors.IsNotFound(err) {
// Resource doesn't exist yet - this is normal during initial creation when Grove is still creating the resources asynchronously
logger.V(1).Info("Grove resource not found yet, skipping scaling for now - will retry on next reconciliation", "gvr", gvr, "name", resourceName, "namespace", namespace)
return nil
}
}
return err
}

// reconcileGroveScaling handles scaling operations for Grove resources based on service replica changes
func (r *DynamoGraphDeploymentReconciler) reconcileGroveScaling(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
logger := log.FromContext(ctx)
logger.V(1).Info("Reconciling Grove scaling operations")

replicaIndex := 0
for serviceName, component := range dynamoDeployment.Spec.Services {
// Skip if replicas are not specified
if component.Replicas == nil {
continue
}

numberOfNodes := component.GetNumberOfNodes()
isMultinode := numberOfNodes > 1

if isMultinode {
// Scale PodCliqueScalingGroup for multinode services
// Grove naming pattern: {DGD.name}-{replicaIndex}-{serviceName}
resourceName := fmt.Sprintf("%s-%d-%s", dynamoDeployment.Name, replicaIndex, strings.ToLower(serviceName))
err := r.scaleGroveResource(ctx,
resourceName,
dynamoDeployment.Namespace,
*component.Replicas,
"PodCliqueScalingGroup")
if err != nil {
logger.Error(err, "Failed to scale PodCliqueScalingGroup", "serviceName", serviceName, "resourceName", resourceName, "replicas", *component.Replicas)
return fmt.Errorf("failed to scale PodCliqueScalingGroup %s: %w", resourceName, err)
}
} else {
// Scale individual PodClique for single-node services
// Grove naming pattern: {DGD.name}-{replicaIndex}-{serviceName}
resourceName := fmt.Sprintf("%s-%d-%s", dynamoDeployment.Name, replicaIndex, strings.ToLower(serviceName))
err := r.scaleGroveResource(ctx,
resourceName,
dynamoDeployment.Namespace,
*component.Replicas,
"PodClique")
if err != nil {
logger.Error(err, "Failed to scale PodClique", "serviceName", serviceName, "resourceName", resourceName, "replicas", *component.Replicas)
return fmt.Errorf("failed to scale PodClique %s: %w", resourceName, err)
}
}
}

logger.V(1).Info("Successfully reconciled Grove scaling operations")
return nil
}

func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (State, Reason, Message, error) {
logger := log.FromContext(ctx)
// generate the dynamoComponentsDeployments from the config
Expand All @@ -177,6 +273,13 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
}
return false
})

// Handle Grove scaling operations after structural changes
if err := r.reconcileGroveScaling(ctx, dynamoDeployment); err != nil {
logger.Error(err, "failed to reconcile Grove scaling")
return FailedState, "grove_scaling_failed", Message(err.Error()), err
}

resources := []Resource{groveGangSetAsResource}
for componentName, component := range dynamoDeployment.Spec.Services {
if component.ComponentType == consts.ComponentTypeFrontend {
Expand All @@ -203,10 +306,6 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
ingressSpec = *component.Ingress
}
mainComponentIngress := dynamo.GenerateComponentIngress(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec)
if err != nil {
logger.Error(err, "failed to generate the main component ingress")
return "", "", "", fmt.Errorf("failed to generate the main component ingress: %w", err)
}
_, syncedMainComponentIngress, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1.Ingress, bool, error) {
if !ingressSpec.Enabled || ingressSpec.IngressControllerClassName == nil {
logger.Info("Ingress is not enabled")
Expand All @@ -224,10 +323,6 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
// generate the main component virtual service
if r.Config.IngressConfig.UseVirtualService() {
mainComponentVirtualService := dynamo.GenerateComponentVirtualService(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec)
if err != nil {
logger.Error(err, "failed to generate the main component virtual service")
return "", "", "", fmt.Errorf("failed to generate the main component virtual service: %w", err)
}
_, syncedMainComponentVirtualService, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1beta1.VirtualService, bool, error) {
if !ingressSpec.IsVirtualServiceEnabled() {
logger.Info("VirtualService is not enabled")
Expand Down
76 changes: 76 additions & 0 deletions deploy/cloud/operator/internal/controller_common/scale.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package controller_common

import (
"context"
"fmt"

autoscalingv1 "k8s.io/api/autoscaling/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/scale"
"sigs.k8s.io/controller-runtime/pkg/log"
)

// ScaleResource scales any Kubernetes resource using the Scale subresource
func ScaleResource(ctx context.Context, scaleClient scale.ScalesGetter, gvr schema.GroupVersionResource, namespace, name string, replicas int32) error {
logger := log.FromContext(ctx)
logger.Info("Scaling resource", "gvr", gvr, "name", name, "namespace", namespace, "replicas", replicas)

if scaleClient == nil {
logger.Error(nil, "Scale client is nil")
return fmt.Errorf("scale client is nil")
}

currentScale, err := scaleClient.Scales(namespace).Get(ctx, gvr.GroupResource(), name, metav1.GetOptions{})
if err != nil {
logger.Error(err, "Failed to get current scale - resource may not support scale subresource", "gvr", gvr, "name", name, "namespace", namespace, "groupResource", gvr.GroupResource())
return fmt.Errorf("failed to get current scale for %s %s (resource may not support scale subresource): %w", gvr.Resource, name, err)
}

if replicas < 0 {
return fmt.Errorf("replicas must be >= 0, got %d", replicas)
}

if currentScale.Spec.Replicas == replicas {
logger.V(1).Info("Resource already at desired replica count", "gvr", gvr, "name", name, "replicas", replicas)
return nil
}

scaleObj := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
ResourceVersion: currentScale.ObjectMeta.ResourceVersion,
},
Spec: autoscalingv1.ScaleSpec{
Replicas: replicas,
},
}

logger.V(1).Info("Updating scale", "gvr", gvr, "name", name, "newReplicas", replicas)
_, err = scaleClient.Scales(namespace).Update(ctx, gvr.GroupResource(), scaleObj, metav1.UpdateOptions{})
if err != nil {
logger.Error(err, "Failed to update scale", "gvr", gvr, "name", name, "replicas", replicas)
return fmt.Errorf("failed to update scale for %s %s: %w", gvr.Resource, name, err)
}

logger.Info("Successfully scaled resource", "gvr", gvr, "name", name, "oldReplicas", currentScale.Spec.Replicas, "newReplicas", replicas)
return nil
}
4 changes: 2 additions & 2 deletions deploy/helm/chart/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ apiVersion: v2
name: dynamo-graph
description: A Helm chart to deploy a Dynamo graph on Kubernetes
type: application
version: 0.4.0
appVersion: 0.4.0
version: 0.4.1
appVersion: 0.4.1
Loading