diff --git a/deploy/cloud/operator/internal/controller/dynamocomponentdeployment_controller_test.go b/deploy/cloud/operator/internal/controller/dynamocomponentdeployment_controller_test.go index d00d8a5cd8..e4ed8b16f3 100644 --- a/deploy/cloud/operator/internal/controller/dynamocomponentdeployment_controller_test.go +++ b/deploy/cloud/operator/internal/controller/dynamocomponentdeployment_controller_test.go @@ -681,6 +681,12 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing. ObjectMeta: metav1.ObjectMeta{ Name: "test-lws-deploy", Namespace: "default", + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "DynamoGraphDeployment", + Name: "test-lws-deploy", + }, + }, }, Spec: v1alpha1.DynamoComponentDeploymentSpec{ DynamoComponent: "test-lws-component", @@ -805,7 +811,16 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing. Image: "test-image:latest", Command: []string{"sh", "-c"}, Args: []string{"ray start --head --port=6379 && some dynamo command"}, - Env: []corev1.EnvVar{{Name: "TEST_ENV_FROM_DYNAMO_COMPONENT_DEPLOYMENT_SPEC", Value: "test_value_from_dynamo_component_deployment_spec"}, {Name: "TEST_ENV_FROM_EXTRA_POD_SPEC", Value: "test_value_from_extra_pod_spec"}}, + Env: []corev1.EnvVar{ + {Name: "DYN_NAMESPACE", Value: "default"}, + {Name: "DYN_PARENT_DGD_K8S_NAME", Value: "test-lws-deploy"}, + {Name: "DYN_PARENT_DGD_K8S_NAMESPACE", Value: "default"}, + {Name: "DYN_SYSTEM_ENABLED", Value: "true"}, + {Name: "DYN_SYSTEM_PORT", Value: "9090"}, + {Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Value: `["generate"]`}, + {Name: "TEST_ENV_FROM_DYNAMO_COMPONENT_DEPLOYMENT_SPEC", Value: "test_value_from_dynamo_component_deployment_spec"}, + {Name: "TEST_ENV_FROM_EXTRA_POD_SPEC", Value: "test_value_from_extra_pod_spec"}, + }, Ports: []corev1.ContainerPort{ { Protocol: corev1.ProtocolTCP, Name: commonconsts.DynamoSystemPortName, ContainerPort: commonconsts.DynamoSystemPort, @@ -905,7 +920,16 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing. Image: "test-image:latest", Command: []string{"sh", "-c"}, Args: []string{"ray start --address=${LWS_LEADER_ADDRESS}:6379 --block"}, - Env: []corev1.EnvVar{{Name: "TEST_ENV_FROM_DYNAMO_COMPONENT_DEPLOYMENT_SPEC", Value: "test_value_from_dynamo_component_deployment_spec"}, {Name: "TEST_ENV_FROM_EXTRA_POD_SPEC", Value: "test_value_from_extra_pod_spec"}}, + Env: []corev1.EnvVar{ + {Name: "DYN_NAMESPACE", Value: "default"}, + {Name: "DYN_PARENT_DGD_K8S_NAME", Value: "test-lws-deploy"}, + {Name: "DYN_PARENT_DGD_K8S_NAMESPACE", Value: "default"}, + {Name: "DYN_SYSTEM_ENABLED", Value: "true"}, + {Name: "DYN_SYSTEM_PORT", Value: "9090"}, + {Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Value: `["generate"]`}, + {Name: "TEST_ENV_FROM_DYNAMO_COMPONENT_DEPLOYMENT_SPEC", Value: "test_value_from_dynamo_component_deployment_spec"}, + {Name: "TEST_ENV_FROM_EXTRA_POD_SPEC", Value: "test_value_from_extra_pod_spec"}, + }, Ports: []corev1.ContainerPort{ { Protocol: corev1.ProtocolTCP, Name: commonconsts.DynamoSystemPortName, ContainerPort: commonconsts.DynamoSystemPort, diff --git a/deploy/cloud/operator/internal/dynamo/graph.go b/deploy/cloud/operator/internal/dynamo/graph.go index 0e8b473578..375cb5a703 100644 --- a/deploy/cloud/operator/internal/dynamo/graph.go +++ b/deploy/cloud/operator/internal/dynamo/graph.go @@ -701,13 +701,14 @@ func GenerateBasePodSpec( main := component.ExtraPodSpec.MainContainer.DeepCopy() if main != nil { // merge the extraPodSpec from the parent deployment with the extraPodSpec from the service + containerEnvs := container.Env err = mergo.Merge(&container, *main, mergo.WithOverride) if err != nil { return nil, fmt.Errorf("failed to merge extraPodSpec: %w", err) } // main container fields that require special handling - container.Env = MergeEnvs(component.Envs, container.Env) + container.Env = MergeEnvs(containerEnvs, container.Env) // Note: startup probe does not have its own top level field so it must be passed in extraPodSpec.MainContainer // We want to overwrite entirely if provided rather than merge if main.StartupProbe != nil { @@ -715,6 +716,7 @@ func GenerateBasePodSpec( } } } + container.Env = MergeEnvs(component.Envs, container.Env) // Merge probes entirely if they are passed (no partial merge) if component.LivenessProbe != nil { diff --git a/deploy/cloud/operator/internal/dynamo/graph_test.go b/deploy/cloud/operator/internal/dynamo/graph_test.go index 496c34efef..e754988bdf 100644 --- a/deploy/cloud/operator/internal/dynamo/graph_test.go +++ b/deploy/cloud/operator/internal/dynamo/graph_test.go @@ -4392,3 +4392,140 @@ func TestGenerateBasePodSpec_PlannerServiceAccount(t *testing.T) { }) } } + +func TestGenerateBasePodSpec_Worker(t *testing.T) { + secretsRetriever := &mockSecretsRetriever{} + controllerConfig := controller_common.Config{} + + tests := []struct { + name string + component *v1alpha1.DynamoComponentDeploymentOverridesSpec + expectedPodSpec *corev1.PodSpec + }{ + { + name: "Planner component should have planner service account", + component: &v1alpha1.DynamoComponentDeploymentOverridesSpec{ + DynamoComponentDeploymentSharedSpec: v1alpha1.DynamoComponentDeploymentSharedSpec{ + Envs: []corev1.EnvVar{ + {Name: "ANOTHER_COMPONENTENV", Value: "true"}, + }, + ComponentType: commonconsts.ComponentTypeWorker, + ExtraPodSpec: &common.ExtraPodSpec{ + MainContainer: &corev1.Container{ + Command: []string{"python3"}, + Args: []string{"-m", "dynamo.worker"}, + Env: []corev1.EnvVar{ + {Name: "ANOTHER_CONTAINER_ENV", Value: "true"}, + }, + }, + }, + }, + }, + expectedPodSpec: &corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Command: []string{"python3"}, + Args: []string{"-m", "dynamo.worker"}, + Env: []corev1.EnvVar{ + {Name: "ANOTHER_COMPONENTENV", Value: "true"}, + {Name: "ANOTHER_CONTAINER_ENV", Value: "true"}, + {Name: "DYN_NAMESPACE", Value: ""}, + {Name: "DYN_PARENT_DGD_K8S_NAME", Value: "test-deployment"}, + {Name: "DYN_PARENT_DGD_K8S_NAMESPACE", Value: "default"}, + {Name: "DYN_SYSTEM_ENABLED", Value: "true"}, + {Name: "DYN_SYSTEM_PORT", Value: "9090"}, + {Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Value: "[\"generate\"]"}, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "shared-memory", + MountPath: "/dev/shm", + }, + }, + LivenessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/live", + Port: intstr.FromString(commonconsts.DynamoSystemPortName), + }, + }, + PeriodSeconds: 5, + TimeoutSeconds: 30, + FailureThreshold: 1, + }, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/health", + Port: intstr.FromString(commonconsts.DynamoSystemPortName), + }, + }, + PeriodSeconds: 10, + TimeoutSeconds: 30, + FailureThreshold: 60, + }, + StartupProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/live", + Port: intstr.FromString(commonconsts.DynamoSystemPortName), + }, + }, + PeriodSeconds: 10, + TimeoutSeconds: 5, + FailureThreshold: 60, + }, + Ports: []corev1.ContainerPort{ + { + Name: commonconsts.DynamoSystemPortName, + ContainerPort: int32(commonconsts.DynamoSystemPort), + Protocol: corev1.ProtocolTCP, + }, + }, + }, + }, + RestartPolicy: corev1.RestartPolicyAlways, + TerminationGracePeriodSeconds: ptr.To(int64(60)), + Volumes: []corev1.Volume{ + { + Name: "shared-memory", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{ + Medium: corev1.StorageMediumMemory, + SizeLimit: func() *resource.Quantity { q := resource.MustParse("512Mi"); return &q }(), + }, + }, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + podSpec, err := GenerateBasePodSpec( + tt.component, + BackendFrameworkSGLang, + secretsRetriever, + "test-deployment", + "default", + RoleMain, + 1, + controllerConfig, + commonconsts.MultinodeDeploymentTypeGrove, + "test-service", + ) + + if err != nil { + t.Errorf("GenerateBasePodSpec() error = %v", err) + return + } + + diff := cmp.Diff(tt.expectedPodSpec, podSpec) + if diff != "" { + t.Errorf("GenerateBasePodSpec() podSpec = %v, want %v, diff = %v", podSpec, tt.expectedPodSpec, diff) + } + }) + } +}