Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ rules:
- get
- patch
- update
- apiGroups:
- scheduling.run.ai
resources:
- queues
verbs:
- get
- list
- apiGroups:
- apps
resources:
Expand Down Expand Up @@ -475,6 +482,45 @@ roleRef:
{{- end }}
name: '{{ include "dynamo-operator.fullname" . }}-manager-role'
subjects:
- kind: ServiceAccount
name: '{{ include "dynamo-operator.fullname" . }}-controller-manager'
namespace: '{{ .Release.Namespace }}'
---
# ClusterRole for kai-scheduler queue access
# This is always a ClusterRole since Queue resources are cluster-scoped
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: {{ include "dynamo-operator.fullname" . }}-queue-reader
labels:
app.kubernetes.io/component: rbac
app.kubernetes.io/created-by: dynamo-operator
app.kubernetes.io/part-of: dynamo-operator
{{- include "dynamo-operator.labels" . | nindent 4 }}
rules:
- apiGroups:
- scheduling.run.ai
resources:
- queues
verbs:
- get
- list
---
# ClusterRoleBinding for kai-scheduler queue access
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: {{ include "dynamo-operator.fullname" . }}-queue-reader-binding
labels:
app.kubernetes.io/component: rbac
app.kubernetes.io/created-by: dynamo-operator
app.kubernetes.io/part-of: dynamo-operator
{{- include "dynamo-operator.labels" . | nindent 4 }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: {{ include "dynamo-operator.fullname" . }}-queue-reader
subjects:
- kind: ServiceAccount
name: '{{ include "dynamo-operator.fullname" . }}-controller-manager'
namespace: '{{ .Release.Namespace }}'
8 changes: 8 additions & 0 deletions deploy/cloud/operator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ func main() {
LWS: commonController.LWSConfig{
Enabled: false, // Will be set after LWS discovery
},
KaiScheduler: commonController.KaiSchedulerConfig{
Enabled: false, // Will be set after Kai-scheduler discovery
},
EtcdAddress: etcdAddr,
NatsAddress: natsAddr,
IngressConfig: commonController.IngressConfig{
Expand Down Expand Up @@ -247,6 +250,11 @@ func main() {
lwsEnabled := commonController.DetectLWSAvailability(mainCtx, mgr)
ctrlConfig.LWS.Enabled = lwsEnabled

// Detect Kai-scheduler availability using discovery client
setupLog.Info("Detecting Kai-scheduler availability...")
kaiSchedulerEnabled := commonController.DetectKaiSchedulerAvailability(mainCtx, mgr)
ctrlConfig.KaiScheduler.Enabled = kaiSchedulerEnabled

// Create etcd client
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{etcdAddr},
Expand Down
7 changes: 7 additions & 0 deletions deploy/cloud/operator/config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ rules:
- get
- patch
- update
- apiGroups:
- scheduling.run.ai
resources:
- queues
verbs:
- get
- list
- apiGroups:
- scheduling.volcano.sh
resources:
Expand Down
34 changes: 33 additions & 1 deletion deploy/cloud/operator/internal/consts/consts.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package consts

import "time"
import (
"time"

"k8s.io/apimachinery/pkg/runtime/schema"
)

const (
HPACPUDefaultAverageUtilization = 80
Expand Down Expand Up @@ -55,6 +59,12 @@ const (
DefaultSharedMemoryMountPath = "/dev/shm"
DefaultSharedMemorySize = "8Gi"

// Kai-scheduler related constants
KubeAnnotationKaiSchedulerQueue = "nvidia.com/kai-scheduler-queue" // User-provided annotation to specify queue name
KubeLabelKaiSchedulerQueue = "kai.scheduler/queue" // Label injected into pods for kai-scheduler
KaiSchedulerName = "kai-scheduler" // Scheduler name for kai-scheduler
DefaultKaiSchedulerQueue = "dynamo" // Default queue name when none specified

// Grove multinode role suffixes
GroveRoleSuffixLeader = "ldr"
GroveRoleSuffixWorker = "wkr"
Expand All @@ -68,3 +78,25 @@ const (
MultinodeDeploymentTypeGrove MultinodeDeploymentType = "grove"
MultinodeDeploymentTypeLWS MultinodeDeploymentType = "lws"
)

// GroupVersionResources for external APIs
var (
// Grove GroupVersionResources for scaling operations
PodCliqueGVR = schema.GroupVersionResource{
Group: "grove.io",
Version: "v1alpha1",
Resource: "podcliques",
}
PodCliqueScalingGroupGVR = schema.GroupVersionResource{
Group: "grove.io",
Version: "v1alpha1",
Resource: "podcliquescalinggroups",
}

// KAI-Scheduler GroupVersionResource for queue validation
QueueGVR = schema.GroupVersionResource{
Group: "scheduling.run.ai",
Version: "v2",
Resource: "queues",
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,6 @@ const (
PendingState State = "pending"
)

var (
// Grove GroupVersionResources for scaling operations
podCliqueGVR = schema.GroupVersionResource{
Group: "grove.io",
Version: "v1alpha1",
Resource: "podcliques",
}
podCliqueScalingGroupGVR = schema.GroupVersionResource{
Group: "grove.io",
Version: "v1alpha1",
Resource: "podcliquescalinggroups",
}
)

type etcdStorage interface {
DeleteKeys(ctx context.Context, prefix string) error
}
Expand All @@ -88,6 +74,7 @@ type DynamoGraphDeploymentReconciler struct {
// +kubebuilder:rbac:groups=grove.io,resources=podgangsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=grove.io,resources=podcliques/scale,verbs=get;update;patch
// +kubebuilder:rbac:groups=grove.io,resources=podcliquescalinggroups/scale,verbs=get;update;patch
// +kubebuilder:rbac:groups=scheduling.run.ai,resources=queues,verbs=get;list

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand Down Expand Up @@ -201,9 +188,9 @@ func (r *DynamoGraphDeploymentReconciler) scaleGroveResource(ctx context.Context
var gvr schema.GroupVersionResource
switch resourceType {
case "PodClique":
gvr = podCliqueGVR
gvr = consts.PodCliqueGVR
case "PodCliqueScalingGroup":
gvr = podCliqueScalingGroupGVR
gvr = consts.PodCliqueScalingGroupGVR
default:
return fmt.Errorf("unsupported Grove resource type: %s", resourceType)
}
Expand Down
13 changes: 13 additions & 0 deletions deploy/cloud/operator/internal/controller_common/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,17 @@ type LWSConfig struct {
Enabled bool
}

type KaiSchedulerConfig struct {
// Enabled is automatically determined by checking if Kai-scheduler CRDs are installed in the cluster
Enabled bool
}

type Config struct {
// Enable resources filtering, only the resources belonging to the given namespace will be handled.
RestrictedNamespace string
Grove GroveConfig
LWS LWSConfig
KaiScheduler KaiSchedulerConfig
EtcdAddress string
NatsAddress string
IngressConfig IngressConfig
Expand Down Expand Up @@ -75,6 +81,12 @@ func DetectLWSAvailability(ctx context.Context, mgr ctrl.Manager) bool {
return detectAPIGroupAvailability(ctx, mgr, "leaderworkerset.x-k8s.io")
}

// DetectKaiSchedulerAvailability checks if Kai-scheduler is available by checking if the scheduling.run.ai API group is registered
// This approach uses the discovery client which is simpler and more reliable
func DetectKaiSchedulerAvailability(ctx context.Context, mgr ctrl.Manager) bool {
return detectAPIGroupAvailability(ctx, mgr, "scheduling.run.ai")
}

// detectAPIGroupAvailability checks if a specific API group is registered in the cluster
func detectAPIGroupAvailability(ctx context.Context, mgr ctrl.Manager, groupName string) bool {
logger := log.FromContext(ctx)
Expand Down Expand Up @@ -107,6 +119,7 @@ func detectAPIGroupAvailability(ctx context.Context, mgr ctrl.Manager, groupName
logger.Info("API group not available", "group", groupName)
return false
}

func EphemeralDeploymentEventFilter(config Config) predicate.Predicate {
return predicate.NewPredicateFuncs(func(o client.Object) bool {
l := log.FromContext(context.Background())
Expand Down
15 changes: 15 additions & 0 deletions deploy/cloud/operator/internal/dynamo/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,17 @@ func GenerateGrovePodGangSet(
if controllerConfig.Grove.TerminationDelay > 0 {
gangSet.Spec.Template.TerminationDelay = &metav1.Duration{Duration: controllerConfig.Grove.TerminationDelay}
}

// Validate kai-scheduler queue once if kai-scheduler is enabled
var validatedQueueName string
if controllerConfig.Grove.Enabled && controllerConfig.KaiScheduler.Enabled {
var err error
validatedQueueName, err = DetermineKaiSchedulerQueue(ctx, dynamoDeployment.Annotations)
if err != nil {
return nil, fmt.Errorf("failed to determine kai-scheduler queue: %w", err)
}
}

dynamoNamespace, err := getDynamoNamespace(dynamoDeployment)
if err != nil {
return nil, fmt.Errorf("failed to get the graph dynamo namespace: %w", err)
Expand Down Expand Up @@ -935,6 +946,10 @@ func GenerateGrovePodGangSet(
return nil, fmt.Errorf("failed to generate annotations: %w", err)
}
clique.Annotations = annotations

// Inject kai-scheduler settings if enabled
injectKaiSchedulerIfEnabled(clique, controllerConfig, validatedQueueName)

gangSet.Spec.Template.Cliques = append(gangSet.Spec.Template.Cliques, clique)
cliqueNames = append(cliqueNames, strings.ToLower(r.Name))
}
Expand Down
96 changes: 96 additions & 0 deletions deploy/cloud/operator/internal/dynamo/grove.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (

nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
ctrl "sigs.k8s.io/controller-runtime"
)

type GroveMultinodeDeployer struct {
Expand Down Expand Up @@ -130,3 +134,95 @@ func checkPCSGReady(ctx context.Context, client client.Client, resourceName, nam

return true, ""
}

// resolveKaiSchedulerQueueName extracts the queue name from annotations or returns default
// This is the shared logic between DetermineKaiSchedulerQueue and ResolveKaiSchedulerQueue
func resolveKaiSchedulerQueueName(annotations map[string]string) string {
queueName := commonconsts.DefaultKaiSchedulerQueue
if annotations != nil {
if annotationQueue, exists := annotations[commonconsts.KubeAnnotationKaiSchedulerQueue]; exists && strings.TrimSpace(annotationQueue) != "" {
queueName = strings.TrimSpace(annotationQueue)
}
}
return queueName
}

// ensureQueueExists validates that a Queue resource with the given name exists in the cluster
// Returns an error if the queue doesn't exist or if validation fails
func ensureQueueExists(ctx context.Context, dynamicClient dynamic.Interface, queueName string) error {
logger := log.FromContext(ctx)

// Try to get the queue resource using the predefined GVR
_, err := dynamicClient.Resource(commonconsts.QueueGVR).Get(ctx, queueName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
logger.Error(err, "Queue not found", "queueName", queueName)
return fmt.Errorf("queue '%s' not found in cluster. Ensure the queue exists before using kai-scheduler", queueName)
}
logger.Error(err, "Failed to validate queue", "queueName", queueName)
return fmt.Errorf("failed to validate queue '%s': %w", queueName, err)
}

logger.Info("Queue validation successful", "queueName", queueName)
return nil
}

// DetermineKaiSchedulerQueue determines the queue name for kai-scheduler from deployment annotations or returns default
// Also validates that the queue exists in the cluster
func DetermineKaiSchedulerQueue(ctx context.Context, annotations map[string]string) (string, error) {
// Get the queue name from annotation or use default
queueName := resolveKaiSchedulerQueueName(annotations)

// Create a dynamic client for CRD validation (Queue CRD might not be in the standard client scheme)
cfg, err := ctrl.GetConfig()
if err != nil {
return "", fmt.Errorf("failed to get kubernetes config for queue validation: %w", err)
}

dynamicClient, err := dynamic.NewForConfig(cfg)
if err != nil {
return "", fmt.Errorf("failed to create dynamic client for queue validation: %w", err)
}

// Validate that the queue exists
if err := ensureQueueExists(ctx, dynamicClient, queueName); err != nil {
return "", fmt.Errorf("kai-scheduler queue validation failed: %w", err)
}

return queueName, nil
}

// ResolveKaiSchedulerQueue determines the queue name for kai-scheduler from deployment annotations or returns default
// Does NOT validate - use DetermineKaiSchedulerQueue for validation
func ResolveKaiSchedulerQueue(annotations map[string]string) string {
return resolveKaiSchedulerQueueName(annotations)
}

// injectKaiSchedulerIfEnabled injects kai-scheduler settings into a clique if kai-scheduler is enabled and grove is enabled
func injectKaiSchedulerIfEnabled(
clique *grovev1alpha1.PodCliqueTemplateSpec,
controllerConfig controller_common.Config,
validatedQueueName string,
) {
// Only proceed if grove is enabled, kai-scheduler is enabled, and no manual schedulerName is set
if !controllerConfig.Grove.Enabled || !controllerConfig.KaiScheduler.Enabled {
return
}

// Check if user has manually set schedulerName - if so, respect their choice
if clique.Spec.PodSpec.SchedulerName != "" && clique.Spec.PodSpec.SchedulerName != commonconsts.KaiSchedulerName {
return
}

// Use the pre-validated queue name
queueName := validatedQueueName

// Inject schedulerName
clique.Spec.PodSpec.SchedulerName = commonconsts.KaiSchedulerName

// Inject queue label
if clique.Labels == nil {
clique.Labels = make(map[string]string)
}
clique.Labels[commonconsts.KubeLabelKaiSchedulerQueue] = queueName
}
Loading
Loading