Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/openshift-tests/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ var staticSuites = testSuites{
return strings.Contains(name, "[Feature:EtcdRecovery]") || strings.Contains(name, "[Feature:NodeRecovery]") || isStandardEarlyTest(name)

},
TestTimeout: 60 * time.Minute,
// Duration of the quorum restore test exceeds 60 minutes.
TestTimeout: 90 * time.Minute,
SyntheticEventTests: ginkgo.JUnitForEventsFunc(synthetictests.SystemEventInvariants),
},
PreSuite: suiteWithProviderPreSuite,
Expand Down
19 changes: 19 additions & 0 deletions test/extended/dr/backup_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/kubernetes/test/e2e/framework"

exutil "github.com/openshift/origin/test/extended/util"
Expand Down Expand Up @@ -287,3 +289,20 @@ func waitForReadyEtcdPods(client kubernetes.Interface, masterCount int) {
40*time.Minute,
)
}

func waitForPodsTolerateClientTimeout(c corev1client.PodInterface, label labels.Selector, predicate func(corev1.Pod) bool, count int, timeout time.Duration) {
err := wait.Poll(10*time.Second, timeout, func() (bool, error) {
p, e := exutil.GetPodNamesByFilter(c, label, predicate)
if e != nil {
framework.Logf("Saw an error waiting for etcd pods to become available: %v", e)
// TODO tolerate transient etcd timeout only and fail other errors
return false, nil
}
if len(p) != count {
framework.Logf("Only %d of %d expected pods are ready", len(p), count)
return false, nil
}
return true, nil
})
o.Expect(err).NotTo(o.HaveOccurred())
}
41 changes: 27 additions & 14 deletions test/extended/dr/force_redeploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,50 +103,63 @@ func forceOperandRedeployment(client operatorv1client.OperatorV1Interface) {
},
},
}

// Initiate redeployment of all operands without waiting so that
// redeployment is performed in parallel.
latestRevisions := []int32{}
for _, client := range clients {
// Retrieve the LatestAvailableRevision before rolling out to know
// what revision not to look for in the subsequent check for
// rollout success.
latestRevision := latestAvailableRevision(client)
latestRevisions = append(latestRevisions, latestRevision)

forceRedeployOperand(client)
}

// Wait for redeployment of operands
for i, client := range clients {
latestRevision := latestRevisions[i]
g.By(fmt.Sprintf("Waiting for %s to be updated on all nodes to a revision greater than %d", client, latestRevision))
waitForRollout(client, latestRevision)
framework.Logf("Rollout complete for %s", client)
}
}

// forceRedeployOperand initiates redeployment of an operand and waits for a
// successful rollout.
func forceRedeployOperand(client *operatorConfigClient) {
// Retrieve the LatestAvailableRevision before rolling out to know
// what revision not to look for in the subsequent check for
// rollout success.
func latestAvailableRevision(client *operatorConfigClient) int32 {
g.By(fmt.Sprintf("Finding LatestAvailableRevision for %s", client))
var latestAvailableRevision int32
var revision int32
err := wait.PollImmediate(redeployWaitInterval, redeployWaitTimeout, func() (done bool, err error) {
status, err := client.getStatus(context.Background(), "cluster", metav1.GetOptions{})
if err != nil {
framework.Logf("Error retrieving %s operator status: %v", client, err)
} else {
latestAvailableRevision = status.LatestAvailableRevision
revision = status.LatestAvailableRevision
}
return err == nil, nil
})
o.Expect(err).NotTo(o.HaveOccurred())
framework.Logf("LatestAvailableRevision for %s is %d", client, latestAvailableRevision)
framework.Logf("LatestAvailableRevision for %s is %d", client, revision)
return revision
}

// forceRedeployOperand initiates forced redeployment of an operand.
func forceRedeployOperand(client *operatorConfigClient) {
// Ensure a unique forceRedeploymentReason for each test run to
// ensure rollout is always triggered even if running repeatedly
// against the same cluster (as when debugging).
reason := fmt.Sprintf("e2e-cluster-restore-%s", uuid.NewUUID())

g.By(fmt.Sprintf("Forcing redeployment of %s", client))
data := fmt.Sprintf(`{"spec": {"forceRedeploymentReason": "%s"}}`, reason)
err = wait.PollImmediate(redeployWaitInterval, redeployWaitTimeout, func() (done bool, err error) {
err := wait.PollImmediate(redeployWaitInterval, redeployWaitTimeout, func() (done bool, err error) {
err = client.patch(context.Background(), "cluster", types.MergePatchType, []byte(data), metav1.PatchOptions{})
if err != nil {
framework.Logf("Error patching %s operator status to set redeploy reason: %v", client, err)
}
return err == nil, nil
})
o.Expect(err).NotTo(o.HaveOccurred())

g.By(fmt.Sprintf("Waiting for %s to be updated on all nodes to a revision greater than %d", client, latestAvailableRevision))
waitForRollout(client, latestAvailableRevision)
framework.Logf("Rollout complete for %s", client)
}

// waitForRollout waits for an operator status to indicate that all nodes are
Expand Down
177 changes: 73 additions & 104 deletions test/extended/dr/quorum_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,20 @@ package dr
import (
"context"
"fmt"
"io/ioutil"
"math/rand"
"strings"
"time"

g "github.com/onsi/ginkgo"
o "github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/kubernetes/test/e2e/framework"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
Expand Down Expand Up @@ -56,9 +51,9 @@ var _ = g.Describe("[sig-etcd][Feature:DisasterRecovery][Disruptive]", func() {

oc := exutil.NewCLIWithoutNamespace("disaster-recovery")

// Validate backing up and restoring to the same node on a cluster
// that has lost quorum after the backup was taken.
g.It("[Feature:EtcdRecovery] Cluster should restore itself after quorum loss", func() {
e2eskipper.Skipf("Test is disabled pending a fix https://github.com/openshift/origin/pull/25774")

config, err := framework.LoadConfig()
o.Expect(err).NotTo(o.HaveOccurred())
dynamicClient := dynamic.NewForConfigOrDie(config)
Expand Down Expand Up @@ -104,6 +99,12 @@ var _ = g.Describe("[sig-etcd][Feature:DisasterRecovery][Disruptive]", func() {
survivingMachine, err := ms.Get(context.Background(), survivingMachineName, metav1.GetOptions{})
o.Expect(err).NotTo(o.HaveOccurred())

// The backup script only supports taking a backup of a cluster
// that still has quorum, so the backup must be performed before
// quorum-destroying machine deletion.
framework.Logf("Perform etcd backup on node %s (machine %s) while quorum still exists", survivingNodeName, survivingMachineName)
execOnNodeOrFail(survivingNode, "sudo -i /bin/bash -cx 'rm -rf /home/core/backup && /usr/local/bin/cluster-backup.sh /home/core/backup'")

framework.Logf("Destroy %d masters", len(masters)-1)
var masterMachines []string
for _, node := range masters {
Expand All @@ -118,6 +119,13 @@ var _ = g.Describe("[sig-etcd][Feature:DisasterRecovery][Disruptive]", func() {
err = ms.Delete(context.Background(), masterMachine, metav1.DeleteOptions{})
o.Expect(err).NotTo(o.HaveOccurred())
}

// All API calls for the remainder of the test should be performed in
// a polling loop to insure against transient failures. API calls can
// only be assumed to succeed without polling against a healthy
// cluster, and only at the successful exit of this function will
// that once again be the case.

pollConfig := rest.CopyConfig(config)
pollConfig.Timeout = 5 * time.Second
pollClient, err := kubernetes.NewForConfig(pollConfig)
Expand Down Expand Up @@ -147,24 +155,51 @@ var _ = g.Describe("[sig-etcd][Feature:DisasterRecovery][Disruptive]", func() {
})
}

framework.Logf("Perform etcd backup on remaining machine %s (machine %s)", survivingNodeName, survivingMachineName)
// Need to supply --force to the backup script to avoid failing on the api check for progressing operators.
execOnNodeOrFail(survivingNode, "sudo -i /bin/bash -cx 'rm -rf /home/core/backup; /usr/local/bin/cluster-backup.sh --force ~core/backup'")
// Recovery 7
restoreFromBackup(survivingNode)

framework.Logf("Restore etcd and control-plane on remaining node %s (machine %s)", survivingNodeName, survivingMachineName)
execOnNodeOrFail(survivingNode, "sudo -i /bin/bash -cx '/usr/local/bin/cluster-restore.sh /home/core/backup'")
// Recovery 8
restartKubelet(survivingNode)

framework.Logf("Wait for API server to come up")
time.Sleep(30 * time.Second)
err = wait.Poll(30*time.Second, 30*time.Minute, func() (done bool, err error) {
nodes, err := pollClient.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{Limit: 2})
if err != nil || nodes.Items == nil {
framework.Logf("return false - err %v nodes.Items %v", err, nodes.Items)
return false, nil
// Recovery 9a, 9b
waitForAPIServer(oc.AdminKubeClient(), survivingNode)

// Restoring brings back machines and nodes deleted since the
// backup was taken. Those machines and nodes need to be removed
// before they can be created again.
//
// TODO(marun) Ensure the mechanics of node replacement around
// disaster recovery are documented.
for _, master := range masterMachines {
if master == survivingMachineName {
continue
}
return true, nil
})
o.Expect(err).NotTo(o.HaveOccurred())
err := wait.PollImmediate(5*time.Second, 5*time.Minute, func() (bool, error) {
framework.Logf("Initiating deletion of machine removed after the backup was taken: %s", master)
err := ms.Delete(context.Background(), master, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
framework.Logf("Error seen when attempting to remove restored machine %s: %v", master, err)
return false, nil
}
return true, nil
})
o.Expect(err).NotTo(o.HaveOccurred())
}
for _, node := range masters {
if node.Name == survivingNodeName {
continue
}
err := wait.PollImmediate(5*time.Second, 5*time.Minute, func() (bool, error) {
framework.Logf("Initiating deletion of node removed after the backup was taken: %s", node.Name)
err := oc.AdminKubeClient().CoreV1().Nodes().Delete(context.Background(), node.Name, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
framework.Logf("Error seen when attempting to remove restored node %s: %v", node.Name, err)
return false, nil
}
return true, nil
})
o.Expect(err).NotTo(o.HaveOccurred())
}

if expectedNumberOfMasters == 1 {
framework.Logf("Cannot create new masters, you must manually create masters and update their DNS entries according to the docs")
Expand Down Expand Up @@ -240,108 +275,42 @@ var _ = g.Describe("[sig-etcd][Feature:DisasterRecovery][Disruptive]", func() {
o.Expect(err).NotTo(o.HaveOccurred())
}

framework.Logf("Force new revision of etcd-pod")
_, err = oc.AdminOperatorClient().OperatorV1().Etcds().Patch(context.Background(), "cluster", types.MergePatchType, []byte(`{"spec": {"forceRedeploymentReason": "recover-etcd"}}`), metav1.PatchOptions{})
o.Expect(err).NotTo(o.HaveOccurred())

framework.Logf("Force new revision of kube-apiserver")
_, err = oc.AdminOperatorClient().OperatorV1().KubeAPIServers().Patch(context.Background(), "cluster", types.MergePatchType, []byte(`{"spec": {"forceRedeploymentReason": "recover-kube-apiserver"}}`), metav1.PatchOptions{})
o.Expect(err).NotTo(o.HaveOccurred())
// Recovery 10,11,12
forceOperandRedeployment(oc.AdminOperatorClient().OperatorV1())

// Recovery 13
waitForReadyEtcdPods(oc.AdminKubeClient(), expectedNumberOfMasters)

scaleEtcdQuorum(pollClient, expectedNumberOfMasters)
// Scale quorum guard in a polling loop to ensure tolerance for disruption
err = wait.Poll(10*time.Second, 5*time.Minute, func() (bool, error) {
err := scaleEtcdQuorum(pollClient, int32(expectedNumberOfMasters))
if err != nil {
framework.Logf("Saw an error attempting to scale etcd quorum guard: %v", err)
return false, nil
}
return true, nil
})
o.Expect(err).NotTo(o.HaveOccurred())

// Workaround for https://bugzilla.redhat.com/show_bug.cgi?id=1707006#
// SDN won't switch to Degraded mode when service is down after disaster recovery
// restartSDNPods(oc)
waitForMastersToUpdate(oc, mcps)
waitForOperatorsToSettle()
})
},
)
})

func waitForPodsTolerateClientTimeout(c corev1client.PodInterface, label labels.Selector, predicate func(corev1.Pod) bool, count int, timeout time.Duration) {
err := wait.Poll(10*time.Second, timeout, func() (bool, error) {
p, e := exutil.GetPodNamesByFilter(c, label, predicate)
if e != nil {
framework.Logf("Saw an error waiting for etcd pods to become available: %v", e)
// TODO tolerate transient etcd timeout only and fail other errors
return false, nil
}
if len(p) != count {
return false, nil
}
return true, nil
})
o.Expect(err).NotTo(o.HaveOccurred())
}

func scaleEtcdQuorum(client kubernetes.Interface, replicas int) error {
func scaleEtcdQuorum(client kubernetes.Interface, replicas int32) error {
etcdQGScale, err := client.AppsV1().Deployments("openshift-etcd").GetScale(context.Background(), "etcd-quorum-guard", metav1.GetOptions{})
if err != nil {
return err
}
if etcdQGScale.Spec.Replicas == int32(replicas) {
if etcdQGScale.Spec.Replicas == replicas {
return nil
}
framework.Logf("Scale etcd-quorum-guard to %d replicas", replicas)
etcdQGScale.Spec.Replicas = int32(replicas)
etcdQGScale.Spec.Replicas = replicas
_, err = client.AppsV1().Deployments("openshift-etcd").UpdateScale(context.Background(), "etcd-quorum-guard", etcdQGScale, metav1.UpdateOptions{})
if err != nil {
return err
}

etcdQGScale, err = client.AppsV1().Deployments("openshift-etcd").GetScale(context.Background(), "etcd-quorum-guard", metav1.GetOptions{})
if err != nil {
return err
}
o.Expect(etcdQGScale.Spec.Replicas).To(o.Equal(int32(replicas)))
return nil
}

func getPullSecret(oc *exutil.CLI) string {
framework.Logf("Saving image pull secret")
//TODO: copy of test/extended/operators/images.go, move this to a common func
imagePullSecret, err := oc.KubeFramework().ClientSet.CoreV1().Secrets("openshift-config").Get(context.Background(), "pull-secret", metav1.GetOptions{})
o.Expect(err).NotTo(o.HaveOccurred())
if err != nil {
framework.Failf("unable to get pull secret for cluster: %v", err)
}

// cache file to local temp location
imagePullFile, err := ioutil.TempFile("", "image-pull-secret")
if err != nil {
framework.Failf("unable to create a temporary file: %v", err)
}

// write the content
imagePullSecretBytes := imagePullSecret.Data[".dockerconfigjson"]
if _, err := imagePullFile.Write(imagePullSecretBytes); err != nil {
framework.Failf("unable to write pull secret to temp file: %v", err)
}
if err := imagePullFile.Close(); err != nil {
framework.Failf("unable to close file: %v", err)
}
framework.Logf("Image pull secret: %s", imagePullFile.Name())
return imagePullFile.Name()
}

func getImagePullSpecFromRelease(oc *exutil.CLI, imagePullSecretPath, imageName string) string {
var image string
err := wait.PollImmediate(5*time.Second, 2*time.Minute, func() (bool, error) {
location, err := oc.Run("adm", "release", "info").Args("--image-for", imageName, "--registry-config", imagePullSecretPath).Output()
if err != nil {
framework.Logf("Unable to find release info, retrying: %v", err)
return false, nil
}
image = location
return true, nil
})
o.Expect(err).NotTo(o.HaveOccurred())
return image
return err
}

func getMachineNameByNodeName(oc *exutil.CLI, name string) string {
Expand Down