diff --git a/test/extended/etcd/helpers/helpers.go b/test/extended/etcd/helpers/helpers.go index b8b242a9bad6..c5774782e833 100644 --- a/test/extended/etcd/helpers/helpers.go +++ b/test/extended/etcd/helpers/helpers.go @@ -15,9 +15,11 @@ import ( machineclient "github.com/openshift/client-go/machine/clientset/versioned" machinev1client "github.com/openshift/client-go/machine/clientset/versioned/typed/machine/v1" machinev1beta1client "github.com/openshift/client-go/machine/clientset/versioned/typed/machine/v1beta1" + applycorev1 "k8s.io/client-go/applyconfigurations/core/v1" bmhelper "github.com/openshift/origin/test/extended/baremetal" exutil "github.com/openshift/origin/test/extended/util" + "github.com/openshift/origin/test/extended/util/image" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -30,6 +32,7 @@ import ( "k8s.io/client-go/kubernetes" v1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/kubernetes/test/e2e/framework" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" "k8s.io/utils/pointer" ) @@ -597,6 +600,114 @@ func MachineNameToEtcdMemberName(ctx context.Context, kubeClient kubernetes.Inte return "", fmt.Errorf("unable to find a node for the corresponding %q machine on the following machine's IPs: %v, checked: %v", machineName, machineIPListSet.List(), nodeNames) } +// MasterNodes returns a list of master nodes +func MasterNodes(ctx context.Context, kubeClient kubernetes.Interface) []*corev1.Node { + masterNodes, err := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: masterNodeRoleLabel}) + o.Expect(err).NotTo(o.HaveOccurred()) + + var nodes []*corev1.Node + for i := range masterNodes.Items { + node := &masterNodes.Items[i] + nodes = append(nodes, node) + } + return nodes +} + +// NodeNameToMachineName finds a machine name that corresponds to the given node name +// first it looks up a machine that corresponds to the node by comparing the ProviderID field +// +// # In cases the ProviderID is empty it will try to find a machine that matches an internal IP address +// +// note: +// it will exit and report an error in case the machine was not found +func NodeNameToMachineName(ctx context.Context, kubeClient kubernetes.Interface, machineClient machinev1beta1client.MachineInterface, nodeName string) (string, error) { + node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + return "", err + } + + masterMachines, err := machineClient.List(ctx, metav1.ListOptions{LabelSelector: masterMachineLabelSelector}) + + if node.Spec.ProviderID != "" { + // case 1: find corresponding machine, match on providerID + var machineNames []string + for _, masterMachine := range masterMachines.Items { + if pointer.StringDeref(masterMachine.Spec.ProviderID, "") == node.Spec.ProviderID { + return masterMachine.Name, nil + } + machineNames = append(machineNames, masterMachine.Name) + } + + return "", fmt.Errorf("unable to find a machine for the corresponding %q node on ProviderID: %v, checked: %v", nodeName, node.Spec.ProviderID, machineNames) + } + + // case 2: match on an internal ip address + nodeIPListSet := sets.NewString() + for _, addr := range node.Status.Addresses { + if addr.Type == corev1.NodeInternalIP { + nodeIPListSet.Insert(addr.Address) + } + } + + var machineNames []string + for _, masterMachine := range masterMachines.Items { + for _, addr := range masterMachine.Status.Addresses { + if addr.Type == corev1.NodeInternalIP { + if nodeIPListSet.Has(addr.Address) { + return masterMachine.Name, nil + } + } + machineNames = append(machineNames, masterMachine.Name) + } + } + + return "", fmt.Errorf("unable to find a machine for the correspoding %q node on the following nodes IP: %v, checked: %v", nodeName, nodeIPListSet.List(), machineNames) +} + +// StopKubelet stops the kubelet on the given node by spwaning a pod on the node and running the stop kubelet command +func StopKubelet(ctx context.Context, adminKubeClient kubernetes.Interface, node corev1.Node) error { + podSpec := applycorev1.PodSpec().WithRestartPolicy(corev1.RestartPolicyNever).WithHostNetwork(true).WithHostPID(true) + podSpec.Containers = []applycorev1.ContainerApplyConfiguration{ + *applycorev1.Container(). + WithName("kubelet-stopper"). + WithSecurityContext(applycorev1.SecurityContext().WithPrivileged(true).WithRunAsUser(0)). + WithImage(image.ShellImage()). + WithVolumeMounts(applycorev1.VolumeMount().WithName("host").WithMountPath("/host")). + WithCommand("/bin/sh"). + WithArgs("-c", "chroot /host /bin/sh -c 'sleep 1 && systemctl stop kubelet'"), + } + podSpec.NodeSelector = map[string]string{"kubernetes.io/hostname": node.Labels["kubernetes.io/hostname"]} + podSpec.Tolerations = []applycorev1.TolerationApplyConfiguration{*applycorev1.Toleration().WithOperator(corev1.TolerationOpExists)} + podSpec.Volumes = []applycorev1.VolumeApplyConfiguration{ + *applycorev1.Volume().WithName("host").WithHostPath(applycorev1.HostPathVolumeSource().WithPath("/").WithType("Directory")), + } + + pod := applycorev1.Pod("kubelet-stopper", "openshift-etcd").WithSpec(podSpec) + createdPod, err := adminKubeClient.CoreV1().Pods(*pod.Namespace).Apply(context.Background(), pod, metav1.ApplyOptions{FieldManager: *pod.Name}) + if err != nil { + return fmt.Errorf("error applying pod %w", err) + } + + err = wait.PollUntilContextTimeout(ctx, 15*time.Second, 5*time.Minute, true, func(ctx context.Context) (done bool, err error) { + p, err := adminKubeClient.CoreV1().Pods("openshift-etcd").Get(context.Background(), createdPod.Name, metav1.GetOptions{}) + if err != nil { + return true, fmt.Errorf("error retreving the pod %s : %w", createdPod.Name, err) + } + + if p.Status.Phase == corev1.PodFailed { + return true, fmt.Errorf("pod failed with status %v", p.Status) + } + + podReadyCondition := e2epod.FindPodConditionByType(&p.Status, corev1.PodReady) + if podReadyCondition == nil { + return false, nil + } + + return podReadyCondition.Status == corev1.ConditionFalse, nil + }) + return err +} + func InitPlatformSpecificConfiguration(oc *exutil.CLI) func() { SkipIfUnsupportedPlatform(context.TODO(), oc) diff --git a/test/extended/etcd/vertical_scaling.go b/test/extended/etcd/vertical_scaling.go index 3ce63166faec..fc57981981e5 100644 --- a/test/extended/etcd/vertical_scaling.go +++ b/test/extended/etcd/vertical_scaling.go @@ -2,6 +2,8 @@ package etcd import ( "context" + "fmt" + "time" g "github.com/onsi/ginkgo/v2" o "github.com/onsi/gomega" @@ -11,14 +13,16 @@ import ( machinev1 "github.com/openshift/client-go/machine/clientset/versioned/typed/machine/v1" machinev1beta1client "github.com/openshift/client-go/machine/clientset/versioned/typed/machine/v1beta1" testlibraryapi "github.com/openshift/library-go/test/library/apiserver" - v1 "k8s.io/client-go/kubernetes/typed/core/v1" scalingtestinglibrary "github.com/openshift/origin/test/extended/etcd/helpers" exutil "github.com/openshift/origin/test/extended/util" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/kubernetes/test/e2e/framework" + e2enode "k8s.io/kubernetes/test/e2e/framework/node" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" ) var _ = g.Describe("[sig-etcd][Feature:EtcdVerticalScaling][Suite:openshift/etcd/scaling][Serial] etcd", func() { @@ -293,4 +297,125 @@ var _ = g.Describe("[sig-etcd][Feature:EtcdVerticalScaling][Suite:openshift/etcd err = scalingtestinglibrary.EnsureCPMSReplicasConverged(ctx, cpmsClient) o.Expect(err).ToNot(o.HaveOccurred()) }) + + // The following test covers a vertical scaling scenario when a member is unhealthy. + // This test validates that scale down happens before scale up if the deleted member is unhealthy. + // CPMS is disabled to observe that scale-down happens first in this case. + // + // 1) If the CPMS is active, first disable it by deleting the CPMS custom resource. + // 2) Remove the static pod manifest from a node and stop the kubelet on the node. This makes the member unhealthy. + // 3) Delete the machine hosting the node in step 2. + // 4) Verify the member removal and the total voting member count of 2 to ensure scale-down happens first when a member is unhealthy. + // 5) Restore the initial cluster state by creating a new machine(scale-up) and re-enabling CPMS + g.It("is able to vertically scale down when a member is unhealthy [apigroup:machine.openshift.io]", func() { + framework.Logf("This test validates that scale-down happens before scale up if the deleted member is unhealthy") + + etcdNamespace := "openshift-etcd" + etcdTargetNode := scalingtestinglibrary.MasterNodes(ctx, kubeClient)[0] + + // retrive the pod on the etcdTargetNode that will be removed when the static pod manifest is moved from the node + etcdPods, err := kubeClient.CoreV1().Pods(etcdNamespace).List(ctx, metav1.ListOptions{LabelSelector: "app=etcd", FieldSelector: "spec.nodeName=" + etcdTargetNode.Name}) + err = errors.Wrapf(err, "pre-test: failed to retrieve the etcd pod on the node %s", etcdTargetNode.Name) + o.Expect(err).ToNot(o.HaveOccurred()) + etcdTargetPod := etcdPods.Items[0] + + if cpmsActive { + //step 0: disable the CPMS + framework.Logf("Disable the CPMS") + err := scalingtestinglibrary.DisableCPMS(ctx, g.GinkgoT(), cpmsClient) + err = errors.Wrap(err, "pre-test: failed to disable the CPMS") + o.Expect(err).ToNot(o.HaveOccurred()) + + // re-enable CPMS after the test + defer func() { + framework.Logf("Re-enable the CPMS") + err := scalingtestinglibrary.EnableCPMS(ctx, g.GinkgoT(), cpmsClient) + err = errors.Wrap(err, "post-test: failed to re-enable the CPMS") + o.Expect(err).ToNot(o.HaveOccurred()) + }() + } + + // step 1: make a member unhealthy by removing the etcd static pod manifest from the node, then stopping the kubelet on that node + framework.Logf("Removing the etcd static pod manifest from the node %s", etcdTargetNode.Name) + err = oc.AsAdmin().Run("debug").Args("-n", etcdNamespace, "node/"+etcdTargetNode.Name, "--", "chroot", "/host", "/bin/bash", "-c", "mkdir -p /var/lib/etcd-backup && mv /etc/kubernetes/manifests/etcd-pod.yaml /var/lib/etcd-backup").Execute() + err = errors.Wrapf(err, "unhealthy member setup: failed to remove etcd static pod manifest from the node %s", etcdTargetNode.Name) + o.Expect(err).ToNot(o.HaveOccurred()) + + err = e2epod.WaitForPodNotFoundInNamespace(ctx, kubeClient, etcdTargetPod.Name, etcdNamespace, 5*time.Minute) + err = errors.Wrapf(err, "unhealthy member setup: timed-out waiting for etcd static pod %s on the node %s, to fully terminate", etcdTargetPod.Name, etcdTargetNode.Name) + o.Expect(err).ToNot(o.HaveOccurred()) + + framework.Logf("Stopping the kubelet on the node %s", etcdTargetNode.Name) + err = scalingtestinglibrary.StopKubelet(ctx, oc.AdminKubeClient(), *etcdTargetNode) + err = errors.Wrapf(err, "unhealthy member setup: failed to stop the kubelet on the node %s", etcdTargetNode.Name) + o.Expect(err).ToNot(o.HaveOccurred()) + + isNodeNotReady := e2enode.WaitForNodeToBeNotReady(ctx, kubeClient, etcdTargetNode.Name, 5*time.Minute) + o.Expect(isNodeNotReady).To(o.BeTrue(), fmt.Sprintf("unhealthy member setup: timed out waiting for the node %s to be NotReady", etcdTargetNode.Name)) + + // step 2: delete the machine hosting the node that has unhealthy member + machineToDeleteName, err := scalingtestinglibrary.NodeNameToMachineName(ctx, kubeClient, machineClient, etcdTargetNode.Name) + err = errors.Wrapf(err, "deletion setup: failed to get the machine name for the NotReady node: %s", etcdTargetNode.Name) + o.Expect(err).ToNot(o.HaveOccurred()) + + machineToDelete, err := machineClient.Get(ctx, machineToDeleteName, metav1.GetOptions{}) + err = errors.Wrapf(err, "deletion setup: failed to retrieve the machine: %s", machineToDeleteName) + o.Expect(err).ToNot(o.HaveOccurred()) + + err = scalingtestinglibrary.DeleteMachine(ctx, g.GinkgoT(), machineClient, machineToDeleteName) + o.Expect(err).ToNot(o.HaveOccurred()) + framework.Logf("Deleted machine %q", machineToDeleteName) + + // step 3: wait for the machine pending deletion to have its member removed to indicate scale-down happens first when a member is unhealthy. + framework.Logf("Waiting for etcd member %q to be removed", etcdTargetNode.Name) + err = scalingtestinglibrary.EnsureMemberRemoved(g.GinkgoT(), etcdClientFactory, etcdTargetNode.Name) + err = errors.Wrapf(err, "scale-down: timed out waiting for member (%v) to be removed", etcdTargetNode.Name) + o.Expect(err).ToNot(o.HaveOccurred()) + + // wait for apiserver revision rollout to stabilize + framework.Logf("Waiting for api servers to stabilize on the same revision") + err = testlibraryapi.WaitForAPIServerToStabilizeOnTheSameRevision(g.GinkgoT(), oc.KubeClient().CoreV1().Pods("openshift-kube-apiserver")) + err = errors.Wrap(err, "scale-down: timed out waiting for APIServer pods to stabilize on the same revision") + o.Expect(err).ToNot(o.HaveOccurred()) + + framework.Logf("Waiting for etcd membership to show 2 voting members to verify scale-down happens first when the deleted member is unhealthy") + err = scalingtestinglibrary.EnsureVotingMembersCount(ctx, g.GinkgoT(), etcdClientFactory, kubeClient, 2) + err = errors.Wrap(err, "scale-down: timed out waiting for 2 voting members in the etcd cluster and etcd-endpoints configmap") + o.Expect(err).ToNot(o.HaveOccurred()) + + // step 4: restore to original state and observe scale-up, by creating a new machine that is a copy of the deleted machine + newMachineName, err := scalingtestinglibrary.CreateNewMasterMachine(ctx, g.GinkgoT(), machineClient, machineToDelete) + o.Expect(err).ToNot(o.HaveOccurred()) + framework.Logf("Created machine %q", newMachineName) + + err = scalingtestinglibrary.EnsureMasterMachine(ctx, g.GinkgoT(), newMachineName, machineClient) + err = errors.Wrapf(err, "scale-up: timed out waiting for machine (%s) to become Running", newMachineName) + o.Expect(err).ToNot(o.HaveOccurred()) + + // wait for apiserver revision rollout to stabilize + framework.Logf("Waiting for api servers to stabilize on the same revision") + err = testlibraryapi.WaitForAPIServerToStabilizeOnTheSameRevision(g.GinkgoT(), oc.KubeClient().CoreV1().Pods("openshift-kube-apiserver")) + err = errors.Wrap(err, "scale-up: timed out waiting for APIServer pods to stabilize on the same revision") + o.Expect(err).ToNot(o.HaveOccurred()) + + // verify member and machine counts go back up to 3 + framework.Logf("Waiting for etcd membership to show 3 voting members") + err = scalingtestinglibrary.EnsureVotingMembersCount(ctx, g.GinkgoT(), etcdClientFactory, kubeClient, 3) + err = errors.Wrap(err, "scale-up: timed out waiting for 3 voting members in the etcd cluster and etcd-endpoints configmap") + o.Expect(err).ToNot(o.HaveOccurred()) + + framework.Logf("Waiting for 3 ready replicas on CPMS") + err = scalingtestinglibrary.EnsureReadyReplicasOnCPMS(ctx, g.GinkgoT(), 3, cpmsClient, nodeClient) + err = errors.Wrap(err, "scale-up: timed out waiting for CPMS to show 3 ready replicas") + o.Expect(err).ToNot(o.HaveOccurred()) + + framework.Logf("Waiting for 3 Running master machines") + err = scalingtestinglibrary.EnsureMasterMachinesAndCount(ctx, g.GinkgoT(), machineClient) + err = errors.Wrap(err, "scale-up: timed out waiting for only 3 Running master machines") + o.Expect(err).ToNot(o.HaveOccurred()) + + framework.Logf("Waiting for CPMS replicas to converge") + err = scalingtestinglibrary.EnsureCPMSReplicasConverged(ctx, cpmsClient) + o.Expect(err).ToNot(o.HaveOccurred()) + }) }) diff --git a/test/extended/util/annotate/generated/zz_generated.annotations.go b/test/extended/util/annotate/generated/zz_generated.annotations.go index a7a4c6a72484..af4883d0f28e 100644 --- a/test/extended/util/annotate/generated/zz_generated.annotations.go +++ b/test/extended/util/annotate/generated/zz_generated.annotations.go @@ -1141,6 +1141,8 @@ var Annotations = map[string]string{ "[sig-etcd][Feature:DisasterRecovery][Suite:openshift/etcd/recovery][Timeout:30m] [Feature:EtcdRecovery][Disruptive] Restore snapshot from node on another single unhealthy node": " [Serial]", + "[sig-etcd][Feature:EtcdVerticalScaling][Suite:openshift/etcd/scaling][Serial] etcd is able to vertically scale down when a member is unhealthy [apigroup:machine.openshift.io]": "", + "[sig-etcd][Feature:EtcdVerticalScaling][Suite:openshift/etcd/scaling][Serial] etcd is able to vertically scale up and down when CPMS is disabled [apigroup:machine.openshift.io]": "", "[sig-etcd][Feature:EtcdVerticalScaling][Suite:openshift/etcd/scaling][Serial] etcd is able to vertically scale up and down with a single node [Timeout:60m][apigroup:machine.openshift.io]": "",