diff --git a/deploy/cloud/helm/platform/Chart.yaml b/deploy/cloud/helm/platform/Chart.yaml index d453189ed8..de2c5e825c 100644 --- a/deploy/cloud/helm/platform/Chart.yaml +++ b/deploy/cloud/helm/platform/Chart.yaml @@ -19,11 +19,11 @@ maintainers: url: https://www.nvidia.com description: A Helm chart for NVIDIA Dynamo Platform. type: application -version: 0.4.0 +version: 0.4.1 home: https://nvidia.com dependencies: - name: dynamo-operator - version: 0.4.0 + version: 0.4.1 repository: file://components/operator condition: dynamo-operator.enabled - name: nats diff --git a/deploy/cloud/helm/platform/components/operator/Chart.yaml b/deploy/cloud/helm/platform/components/operator/Chart.yaml index 763837b0b1..f1337176d7 100644 --- a/deploy/cloud/helm/platform/components/operator/Chart.yaml +++ b/deploy/cloud/helm/platform/components/operator/Chart.yaml @@ -27,9 +27,9 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.4.0 +version: 0.4.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "0.4.0" +appVersion: "0.4.1" diff --git a/deploy/cloud/helm/platform/components/operator/templates/manager-rbac.yaml b/deploy/cloud/helm/platform/components/operator/templates/manager-rbac.yaml index bf084e5a1b..8864c05bd2 100644 --- a/deploy/cloud/helm/platform/components/operator/templates/manager-rbac.yaml +++ b/deploy/cloud/helm/platform/components/operator/templates/manager-rbac.yaml @@ -128,6 +128,15 @@ rules: - patch - update - watch +- apiGroups: + - grove.io + resources: + - podcliques/scale + - podcliquescalinggroups/scale + verbs: + - get + - patch + - update - apiGroups: - apps resources: diff --git a/deploy/cloud/operator/cmd/main.go b/deploy/cloud/operator/cmd/main.go index ac8a142caa..fded2f2f52 100644 --- a/deploy/cloud/operator/cmd/main.go +++ b/deploy/cloud/operator/cmd/main.go @@ -31,9 +31,13 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" corev1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/client-go/discovery/cached/memory" + "k8s.io/client-go/dynamic" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth" + "k8s.io/client-go/restmapper" + "k8s.io/client-go/scale" k8sCache "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -65,6 +69,34 @@ var ( setupLog = ctrl.Log.WithName("setup") ) +func createScalesGetter(mgr ctrl.Manager) (scale.ScalesGetter, error) { + config := mgr.GetConfig() + + // Create kubernetes client for discovery + kubeClient, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + + // Create cached discovery client + cachedDiscovery := memory.NewMemCacheClient(kubeClient.Discovery()) + + // Create REST mapper + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscovery) + + scalesGetter, err := scale.NewForConfig( + config, + restMapper, + dynamic.LegacyAPIPathResolverFunc, + scale.NewDiscoveryScaleKindResolver(cachedDiscovery), + ) + if err != nil { + return nil, err + } + + return scalesGetter, nil +} + func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) @@ -321,11 +353,19 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "DynamoComponentDeployment") os.Exit(1) } + // Create scale client for Grove resource scaling + scaleClient, err := createScalesGetter(mgr) + if err != nil { + setupLog.Error(err, "unable to create scale client") + os.Exit(1) + } + if err = (&controller.DynamoGraphDeploymentReconciler{ Client: mgr.GetClient(), Recorder: mgr.GetEventRecorderFor("dynamographdeployment"), Config: ctrlConfig, DockerSecretRetriever: dockerSecretRetriever, + ScaleClient: scaleClient, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "DynamoGraphDeployment") os.Exit(1) diff --git a/deploy/cloud/operator/config/rbac/role.yaml b/deploy/cloud/operator/config/rbac/role.yaml index d3983c1040..ca3770bad7 100644 --- a/deploy/cloud/operator/config/rbac/role.yaml +++ b/deploy/cloud/operator/config/rbac/role.yaml @@ -98,6 +98,15 @@ rules: - patch - update - watch +- apiGroups: + - grove.io + resources: + - podcliques/scale + - podcliquescalinggroups/scale + verbs: + - get + - patch + - update - apiGroups: - grove.io resources: diff --git a/deploy/cloud/operator/internal/controller/dynamographdeployment_controller.go b/deploy/cloud/operator/internal/controller/dynamographdeployment_controller.go index e322c08338..df1a667a80 100644 --- a/deploy/cloud/operator/internal/controller/dynamographdeployment_controller.go +++ b/deploy/cloud/operator/internal/controller/dynamographdeployment_controller.go @@ -20,12 +20,17 @@ package controller import ( "context" "fmt" + "strings" grovev1alpha1 "github.com/NVIDIA/grove/operator/api/core/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/scale" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -50,6 +55,20 @@ const ( PendingState State = "pending" ) +var ( + // Grove GroupVersionResources for scaling operations + podCliqueGVR = schema.GroupVersionResource{ + Group: "grove.io", + Version: "v1alpha1", + Resource: "podcliques", + } + podCliqueScalingGroupGVR = schema.GroupVersionResource{ + Group: "grove.io", + Version: "v1alpha1", + Resource: "podcliquescalinggroups", + } +) + type etcdStorage interface { DeleteKeys(ctx context.Context, prefix string) error } @@ -60,12 +79,15 @@ type DynamoGraphDeploymentReconciler struct { Config commonController.Config Recorder record.EventRecorder DockerSecretRetriever dockerSecretRetriever + ScaleClient scale.ScalesGetter } // +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/status,verbs=get;update;patch // +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/finalizers,verbs=update // +kubebuilder:rbac:groups=grove.io,resources=podgangsets,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=grove.io,resources=podcliques/scale,verbs=get;update;patch +// +kubebuilder:rbac:groups=grove.io,resources=podcliquescalinggroups/scale,verbs=get;update;patch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -156,6 +178,80 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context } +// scaleGroveResource scales a Grove resource using the generic scaling function +func (r *DynamoGraphDeploymentReconciler) scaleGroveResource(ctx context.Context, resourceName, namespace string, newReplicas int32, resourceType string) error { + logger := log.FromContext(ctx) + // Determine the GroupVersionResource based on resource type + var gvr schema.GroupVersionResource + switch resourceType { + case "PodClique": + gvr = podCliqueGVR + case "PodCliqueScalingGroup": + gvr = podCliqueScalingGroupGVR + default: + return fmt.Errorf("unsupported Grove resource type: %s", resourceType) + } + + // Use the generic scaling function + err := commonController.ScaleResource(ctx, r.ScaleClient, gvr, namespace, resourceName, newReplicas) + if err != nil { + if errors.IsNotFound(err) { + // Resource doesn't exist yet - this is normal during initial creation when Grove is still creating the resources asynchronously + logger.V(1).Info("Grove resource not found yet, skipping scaling for now - will retry on next reconciliation", "gvr", gvr, "name", resourceName, "namespace", namespace) + return nil + } + } + return err +} + +// reconcileGroveScaling handles scaling operations for Grove resources based on service replica changes +func (r *DynamoGraphDeploymentReconciler) reconcileGroveScaling(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error { + logger := log.FromContext(ctx) + logger.V(1).Info("Reconciling Grove scaling operations") + + replicaIndex := 0 + for serviceName, component := range dynamoDeployment.Spec.Services { + // Skip if replicas are not specified + if component.Replicas == nil { + continue + } + + numberOfNodes := component.GetNumberOfNodes() + isMultinode := numberOfNodes > 1 + + if isMultinode { + // Scale PodCliqueScalingGroup for multinode services + // Grove naming pattern: {DGD.name}-{replicaIndex}-{serviceName} + resourceName := fmt.Sprintf("%s-%d-%s", dynamoDeployment.Name, replicaIndex, strings.ToLower(serviceName)) + err := r.scaleGroveResource(ctx, + resourceName, + dynamoDeployment.Namespace, + *component.Replicas, + "PodCliqueScalingGroup") + if err != nil { + logger.Error(err, "Failed to scale PodCliqueScalingGroup", "serviceName", serviceName, "resourceName", resourceName, "replicas", *component.Replicas) + return fmt.Errorf("failed to scale PodCliqueScalingGroup %s: %w", resourceName, err) + } + } else { + // Scale individual PodClique for single-node services + // Grove naming pattern: {DGD.name}-{replicaIndex}-{serviceName} + resourceName := fmt.Sprintf("%s-%d-%s", dynamoDeployment.Name, replicaIndex, strings.ToLower(serviceName)) + err := r.scaleGroveResource(ctx, + resourceName, + dynamoDeployment.Namespace, + *component.Replicas, + "PodClique") + if err != nil { + logger.Error(err, "Failed to scale PodClique", "serviceName", serviceName, "resourceName", resourceName, "replicas", *component.Replicas) + return fmt.Errorf("failed to scale PodClique %s: %w", resourceName, err) + } + } + } + + logger.V(1).Info("Successfully reconciled Grove scaling operations") + return nil +} + func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (State, Reason, Message, error) { logger := log.FromContext(ctx) // generate the dynamoComponentsDeployments from the config @@ -177,6 +273,13 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co } return false }) + + // Handle Grove scaling operations after structural changes + if err := r.reconcileGroveScaling(ctx, dynamoDeployment); err != nil { + logger.Error(err, "failed to reconcile Grove scaling") + return FailedState, "grove_scaling_failed", Message(err.Error()), err + } + resources := []Resource{groveGangSetAsResource} for componentName, component := range dynamoDeployment.Spec.Services { if component.ComponentType == consts.ComponentTypeFrontend { @@ -203,10 +306,6 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co ingressSpec = *component.Ingress } mainComponentIngress := dynamo.GenerateComponentIngress(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec) - if err != nil { - logger.Error(err, "failed to generate the main component ingress") - return "", "", "", fmt.Errorf("failed to generate the main component ingress: %w", err) - } _, syncedMainComponentIngress, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1.Ingress, bool, error) { if !ingressSpec.Enabled || ingressSpec.IngressControllerClassName == nil { logger.Info("Ingress is not enabled") @@ -224,10 +323,6 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co // generate the main component virtual service if r.Config.IngressConfig.UseVirtualService() { mainComponentVirtualService := dynamo.GenerateComponentVirtualService(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec) - if err != nil { - logger.Error(err, "failed to generate the main component virtual service") - return "", "", "", fmt.Errorf("failed to generate the main component virtual service: %w", err) - } _, syncedMainComponentVirtualService, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1beta1.VirtualService, bool, error) { if !ingressSpec.IsVirtualServiceEnabled() { logger.Info("VirtualService is not enabled") diff --git a/deploy/cloud/operator/internal/controller_common/scale.go b/deploy/cloud/operator/internal/controller_common/scale.go new file mode 100644 index 0000000000..cc711e8351 --- /dev/null +++ b/deploy/cloud/operator/internal/controller_common/scale.go @@ -0,0 +1,76 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package controller_common + +import ( + "context" + "fmt" + + autoscalingv1 "k8s.io/api/autoscaling/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/scale" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// ScaleResource scales any Kubernetes resource using the Scale subresource +func ScaleResource(ctx context.Context, scaleClient scale.ScalesGetter, gvr schema.GroupVersionResource, namespace, name string, replicas int32) error { + logger := log.FromContext(ctx) + logger.Info("Scaling resource", "gvr", gvr, "name", name, "namespace", namespace, "replicas", replicas) + + if scaleClient == nil { + logger.Error(nil, "Scale client is nil") + return fmt.Errorf("scale client is nil") + } + + currentScale, err := scaleClient.Scales(namespace).Get(ctx, gvr.GroupResource(), name, metav1.GetOptions{}) + if err != nil { + logger.Error(err, "Failed to get current scale - resource may not support scale subresource", "gvr", gvr, "name", name, "namespace", namespace, "groupResource", gvr.GroupResource()) + return fmt.Errorf("failed to get current scale for %s %s (resource may not support scale subresource): %w", gvr.Resource, name, err) + } + + if replicas < 0 { + return fmt.Errorf("replicas must be >= 0, got %d", replicas) + } + + if currentScale.Spec.Replicas == replicas { + logger.V(1).Info("Resource already at desired replica count", "gvr", gvr, "name", name, "replicas", replicas) + return nil + } + + scaleObj := &autoscalingv1.Scale{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + ResourceVersion: currentScale.ObjectMeta.ResourceVersion, + }, + Spec: autoscalingv1.ScaleSpec{ + Replicas: replicas, + }, + } + + logger.V(1).Info("Updating scale", "gvr", gvr, "name", name, "newReplicas", replicas) + _, err = scaleClient.Scales(namespace).Update(ctx, gvr.GroupResource(), scaleObj, metav1.UpdateOptions{}) + if err != nil { + logger.Error(err, "Failed to update scale", "gvr", gvr, "name", name, "replicas", replicas) + return fmt.Errorf("failed to update scale for %s %s: %w", gvr.Resource, name, err) + } + + logger.Info("Successfully scaled resource", "gvr", gvr, "name", name, "oldReplicas", currentScale.Spec.Replicas, "newReplicas", replicas) + return nil +} diff --git a/deploy/helm/chart/Chart.yaml b/deploy/helm/chart/Chart.yaml index 9afc10e96d..85390caeff 100644 --- a/deploy/helm/chart/Chart.yaml +++ b/deploy/helm/chart/Chart.yaml @@ -17,5 +17,5 @@ apiVersion: v2 name: dynamo-graph description: A Helm chart to deploy a Dynamo graph on Kubernetes type: application -version: 0.4.0 -appVersion: 0.4.0 +version: 0.4.1 +appVersion: 0.4.1