Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions test/extended/etcd/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
machineclient "github.com/openshift/client-go/machine/clientset/versioned"
machinev1client "github.com/openshift/client-go/machine/clientset/versioned/typed/machine/v1"
machinev1beta1client "github.com/openshift/client-go/machine/clientset/versioned/typed/machine/v1beta1"
applycorev1 "k8s.io/client-go/applyconfigurations/core/v1"

bmhelper "github.com/openshift/origin/test/extended/baremetal"
exutil "github.com/openshift/origin/test/extended/util"
"github.com/openshift/origin/test/extended/util/image"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -30,6 +32,7 @@ import (
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
"k8s.io/utils/pointer"
)
Expand Down Expand Up @@ -597,6 +600,114 @@ func MachineNameToEtcdMemberName(ctx context.Context, kubeClient kubernetes.Inte
return "", fmt.Errorf("unable to find a node for the corresponding %q machine on the following machine's IPs: %v, checked: %v", machineName, machineIPListSet.List(), nodeNames)
}

// MasterNodes returns a list of master nodes
func MasterNodes(ctx context.Context, kubeClient kubernetes.Interface) []*corev1.Node {
masterNodes, err := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: masterNodeRoleLabel})
o.Expect(err).NotTo(o.HaveOccurred())

var nodes []*corev1.Node
for i := range masterNodes.Items {
node := &masterNodes.Items[i]
nodes = append(nodes, node)
}
return nodes
}

// NodeNameToMachineName finds a machine name that corresponds to the given node name
// first it looks up a machine that corresponds to the node by comparing the ProviderID field
//
// # In cases the ProviderID is empty it will try to find a machine that matches an internal IP address
//
// note:
// it will exit and report an error in case the machine was not found
func NodeNameToMachineName(ctx context.Context, kubeClient kubernetes.Interface, machineClient machinev1beta1client.MachineInterface, nodeName string) (string, error) {
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return "", err
}

masterMachines, err := machineClient.List(ctx, metav1.ListOptions{LabelSelector: masterMachineLabelSelector})

if node.Spec.ProviderID != "" {
// case 1: find corresponding machine, match on providerID
var machineNames []string
for _, masterMachine := range masterMachines.Items {
if pointer.StringDeref(masterMachine.Spec.ProviderID, "") == node.Spec.ProviderID {
return masterMachine.Name, nil
}
machineNames = append(machineNames, masterMachine.Name)
}

return "", fmt.Errorf("unable to find a machine for the corresponding %q node on ProviderID: %v, checked: %v", nodeName, node.Spec.ProviderID, machineNames)
}

// case 2: match on an internal ip address
nodeIPListSet := sets.NewString()
for _, addr := range node.Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
nodeIPListSet.Insert(addr.Address)
}
}

var machineNames []string
for _, masterMachine := range masterMachines.Items {
for _, addr := range masterMachine.Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
if nodeIPListSet.Has(addr.Address) {
return masterMachine.Name, nil
}
}
machineNames = append(machineNames, masterMachine.Name)
}
}

return "", fmt.Errorf("unable to find a machine for the correspoding %q node on the following nodes IP: %v, checked: %v", nodeName, nodeIPListSet.List(), machineNames)
}

// StopKubelet stops the kubelet on the given node by spwaning a pod on the node and running the stop kubelet command
func StopKubelet(ctx context.Context, adminKubeClient kubernetes.Interface, node corev1.Node) error {
podSpec := applycorev1.PodSpec().WithRestartPolicy(corev1.RestartPolicyNever).WithHostNetwork(true).WithHostPID(true)
podSpec.Containers = []applycorev1.ContainerApplyConfiguration{
*applycorev1.Container().
WithName("kubelet-stopper").
WithSecurityContext(applycorev1.SecurityContext().WithPrivileged(true).WithRunAsUser(0)).
WithImage(image.ShellImage()).
WithVolumeMounts(applycorev1.VolumeMount().WithName("host").WithMountPath("/host")).
WithCommand("/bin/sh").
WithArgs("-c", "chroot /host /bin/sh -c 'sleep 1 && systemctl stop kubelet'"),
}
podSpec.NodeSelector = map[string]string{"kubernetes.io/hostname": node.Labels["kubernetes.io/hostname"]}
podSpec.Tolerations = []applycorev1.TolerationApplyConfiguration{*applycorev1.Toleration().WithOperator(corev1.TolerationOpExists)}
podSpec.Volumes = []applycorev1.VolumeApplyConfiguration{
*applycorev1.Volume().WithName("host").WithHostPath(applycorev1.HostPathVolumeSource().WithPath("/").WithType("Directory")),
}

pod := applycorev1.Pod("kubelet-stopper", "openshift-etcd").WithSpec(podSpec)
createdPod, err := adminKubeClient.CoreV1().Pods(*pod.Namespace).Apply(context.Background(), pod, metav1.ApplyOptions{FieldManager: *pod.Name})
if err != nil {
return fmt.Errorf("error applying pod %w", err)
}

err = wait.PollUntilContextTimeout(ctx, 15*time.Second, 5*time.Minute, true, func(ctx context.Context) (done bool, err error) {
p, err := adminKubeClient.CoreV1().Pods("openshift-etcd").Get(context.Background(), createdPod.Name, metav1.GetOptions{})
if err != nil {
return true, fmt.Errorf("error retreving the pod %s : %w", createdPod.Name, err)
}

if p.Status.Phase == corev1.PodFailed {
return true, fmt.Errorf("pod failed with status %v", p.Status)
}

podReadyCondition := e2epod.FindPodConditionByType(&p.Status, corev1.PodReady)
if podReadyCondition == nil {
return false, nil
}

return podReadyCondition.Status == corev1.ConditionFalse, nil
})
return err
}

func InitPlatformSpecificConfiguration(oc *exutil.CLI) func() {
SkipIfUnsupportedPlatform(context.TODO(), oc)

Expand Down
127 changes: 126 additions & 1 deletion test/extended/etcd/vertical_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package etcd

import (
"context"
"fmt"
"time"

g "github.com/onsi/ginkgo/v2"
o "github.com/onsi/gomega"
Expand All @@ -11,14 +13,16 @@ import (
machinev1 "github.com/openshift/client-go/machine/clientset/versioned/typed/machine/v1"
machinev1beta1client "github.com/openshift/client-go/machine/clientset/versioned/typed/machine/v1beta1"
testlibraryapi "github.com/openshift/library-go/test/library/apiserver"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"

scalingtestinglibrary "github.com/openshift/origin/test/extended/etcd/helpers"
exutil "github.com/openshift/origin/test/extended/util"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
)

var _ = g.Describe("[sig-etcd][Feature:EtcdVerticalScaling][Suite:openshift/etcd/scaling][Serial] etcd", func() {
Expand Down Expand Up @@ -293,4 +297,125 @@ var _ = g.Describe("[sig-etcd][Feature:EtcdVerticalScaling][Suite:openshift/etcd
err = scalingtestinglibrary.EnsureCPMSReplicasConverged(ctx, cpmsClient)
o.Expect(err).ToNot(o.HaveOccurred())
})

// The following test covers a vertical scaling scenario when a member is unhealthy.
// This test validates that scale down happens before scale up if the deleted member is unhealthy.
// CPMS is disabled to observe that scale-down happens first in this case.
//
// 1) If the CPMS is active, first disable it by deleting the CPMS custom resource.
// 2) Remove the static pod manifest from a node and stop the kubelet on the node. This makes the member unhealthy.
// 3) Delete the machine hosting the node in step 2.
// 4) Verify the member removal and the total voting member count of 2 to ensure scale-down happens first when a member is unhealthy.
// 5) Restore the initial cluster state by creating a new machine(scale-up) and re-enabling CPMS
g.It("is able to vertically scale down when a member is unhealthy [apigroup:machine.openshift.io]", func() {
framework.Logf("This test validates that scale-down happens before scale up if the deleted member is unhealthy")

etcdNamespace := "openshift-etcd"
etcdTargetNode := scalingtestinglibrary.MasterNodes(ctx, kubeClient)[0]

// retrive the pod on the etcdTargetNode that will be removed when the static pod manifest is moved from the node
etcdPods, err := kubeClient.CoreV1().Pods(etcdNamespace).List(ctx, metav1.ListOptions{LabelSelector: "app=etcd", FieldSelector: "spec.nodeName=" + etcdTargetNode.Name})
err = errors.Wrapf(err, "pre-test: failed to retrieve the etcd pod on the node %s", etcdTargetNode.Name)
o.Expect(err).ToNot(o.HaveOccurred())
etcdTargetPod := etcdPods.Items[0]

if cpmsActive {
//step 0: disable the CPMS
framework.Logf("Disable the CPMS")
err := scalingtestinglibrary.DisableCPMS(ctx, g.GinkgoT(), cpmsClient)
err = errors.Wrap(err, "pre-test: failed to disable the CPMS")
o.Expect(err).ToNot(o.HaveOccurred())

// re-enable CPMS after the test
defer func() {
framework.Logf("Re-enable the CPMS")
err := scalingtestinglibrary.EnableCPMS(ctx, g.GinkgoT(), cpmsClient)
err = errors.Wrap(err, "post-test: failed to re-enable the CPMS")
o.Expect(err).ToNot(o.HaveOccurred())
}()
}

// step 1: make a member unhealthy by removing the etcd static pod manifest from the node, then stopping the kubelet on that node
framework.Logf("Removing the etcd static pod manifest from the node %s", etcdTargetNode.Name)
err = oc.AsAdmin().Run("debug").Args("-n", etcdNamespace, "node/"+etcdTargetNode.Name, "--", "chroot", "/host", "/bin/bash", "-c", "mkdir -p /var/lib/etcd-backup && mv /etc/kubernetes/manifests/etcd-pod.yaml /var/lib/etcd-backup").Execute()
err = errors.Wrapf(err, "unhealthy member setup: failed to remove etcd static pod manifest from the node %s", etcdTargetNode.Name)
o.Expect(err).ToNot(o.HaveOccurred())

err = e2epod.WaitForPodNotFoundInNamespace(ctx, kubeClient, etcdTargetPod.Name, etcdNamespace, 5*time.Minute)
err = errors.Wrapf(err, "unhealthy member setup: timed-out waiting for etcd static pod %s on the node %s, to fully terminate", etcdTargetPod.Name, etcdTargetNode.Name)
o.Expect(err).ToNot(o.HaveOccurred())

framework.Logf("Stopping the kubelet on the node %s", etcdTargetNode.Name)
err = scalingtestinglibrary.StopKubelet(ctx, oc.AdminKubeClient(), *etcdTargetNode)
err = errors.Wrapf(err, "unhealthy member setup: failed to stop the kubelet on the node %s", etcdTargetNode.Name)
o.Expect(err).ToNot(o.HaveOccurred())

isNodeNotReady := e2enode.WaitForNodeToBeNotReady(ctx, kubeClient, etcdTargetNode.Name, 5*time.Minute)
o.Expect(isNodeNotReady).To(o.BeTrue(), fmt.Sprintf("unhealthy member setup: timed out waiting for the node %s to be NotReady", etcdTargetNode.Name))

// step 2: delete the machine hosting the node that has unhealthy member
machineToDeleteName, err := scalingtestinglibrary.NodeNameToMachineName(ctx, kubeClient, machineClient, etcdTargetNode.Name)
err = errors.Wrapf(err, "deletion setup: failed to get the machine name for the NotReady node: %s", etcdTargetNode.Name)
o.Expect(err).ToNot(o.HaveOccurred())

machineToDelete, err := machineClient.Get(ctx, machineToDeleteName, metav1.GetOptions{})
err = errors.Wrapf(err, "deletion setup: failed to retrieve the machine: %s", machineToDeleteName)
o.Expect(err).ToNot(o.HaveOccurred())

err = scalingtestinglibrary.DeleteMachine(ctx, g.GinkgoT(), machineClient, machineToDeleteName)
o.Expect(err).ToNot(o.HaveOccurred())
framework.Logf("Deleted machine %q", machineToDeleteName)

// step 3: wait for the machine pending deletion to have its member removed to indicate scale-down happens first when a member is unhealthy.
framework.Logf("Waiting for etcd member %q to be removed", etcdTargetNode.Name)
err = scalingtestinglibrary.EnsureMemberRemoved(g.GinkgoT(), etcdClientFactory, etcdTargetNode.Name)
err = errors.Wrapf(err, "scale-down: timed out waiting for member (%v) to be removed", etcdTargetNode.Name)
o.Expect(err).ToNot(o.HaveOccurred())

// wait for apiserver revision rollout to stabilize
framework.Logf("Waiting for api servers to stabilize on the same revision")
err = testlibraryapi.WaitForAPIServerToStabilizeOnTheSameRevision(g.GinkgoT(), oc.KubeClient().CoreV1().Pods("openshift-kube-apiserver"))
err = errors.Wrap(err, "scale-down: timed out waiting for APIServer pods to stabilize on the same revision")
o.Expect(err).ToNot(o.HaveOccurred())

framework.Logf("Waiting for etcd membership to show 2 voting members to verify scale-down happens first when the deleted member is unhealthy")
err = scalingtestinglibrary.EnsureVotingMembersCount(ctx, g.GinkgoT(), etcdClientFactory, kubeClient, 2)
err = errors.Wrap(err, "scale-down: timed out waiting for 2 voting members in the etcd cluster and etcd-endpoints configmap")
o.Expect(err).ToNot(o.HaveOccurred())

// step 4: restore to original state and observe scale-up, by creating a new machine that is a copy of the deleted machine
newMachineName, err := scalingtestinglibrary.CreateNewMasterMachine(ctx, g.GinkgoT(), machineClient, machineToDelete)
o.Expect(err).ToNot(o.HaveOccurred())
framework.Logf("Created machine %q", newMachineName)

err = scalingtestinglibrary.EnsureMasterMachine(ctx, g.GinkgoT(), newMachineName, machineClient)
err = errors.Wrapf(err, "scale-up: timed out waiting for machine (%s) to become Running", newMachineName)
o.Expect(err).ToNot(o.HaveOccurred())

// wait for apiserver revision rollout to stabilize
framework.Logf("Waiting for api servers to stabilize on the same revision")
err = testlibraryapi.WaitForAPIServerToStabilizeOnTheSameRevision(g.GinkgoT(), oc.KubeClient().CoreV1().Pods("openshift-kube-apiserver"))
err = errors.Wrap(err, "scale-up: timed out waiting for APIServer pods to stabilize on the same revision")
o.Expect(err).ToNot(o.HaveOccurred())

// verify member and machine counts go back up to 3
framework.Logf("Waiting for etcd membership to show 3 voting members")
err = scalingtestinglibrary.EnsureVotingMembersCount(ctx, g.GinkgoT(), etcdClientFactory, kubeClient, 3)
err = errors.Wrap(err, "scale-up: timed out waiting for 3 voting members in the etcd cluster and etcd-endpoints configmap")
o.Expect(err).ToNot(o.HaveOccurred())

framework.Logf("Waiting for 3 ready replicas on CPMS")
err = scalingtestinglibrary.EnsureReadyReplicasOnCPMS(ctx, g.GinkgoT(), 3, cpmsClient, nodeClient)
err = errors.Wrap(err, "scale-up: timed out waiting for CPMS to show 3 ready replicas")
o.Expect(err).ToNot(o.HaveOccurred())

framework.Logf("Waiting for 3 Running master machines")
err = scalingtestinglibrary.EnsureMasterMachinesAndCount(ctx, g.GinkgoT(), machineClient)
err = errors.Wrap(err, "scale-up: timed out waiting for only 3 Running master machines")
o.Expect(err).ToNot(o.HaveOccurred())

framework.Logf("Waiting for CPMS replicas to converge")
err = scalingtestinglibrary.EnsureCPMSReplicasConverged(ctx, cpmsClient)
o.Expect(err).ToNot(o.HaveOccurred())
})
})

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.