diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..3246b6d6e --- /dev/null +++ b/Makefile @@ -0,0 +1,7 @@ +build: + hack/build-go.sh +.PHONY: build + +test: + go test ./... +.PHONY: test diff --git a/lib/resourcebuilder/apiext.go b/lib/resourcebuilder/apiext.go index 8ab83d268..acc177c68 100644 --- a/lib/resourcebuilder/apiext.go +++ b/lib/resourcebuilder/apiext.go @@ -2,7 +2,6 @@ package resourcebuilder import ( "context" - "time" "github.com/golang/glog" @@ -49,15 +48,13 @@ func (b *crdBuilder) Do(ctx context.Context) error { return err } if updated { - ctxWithTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute) - defer cancel() - return waitForCustomResourceDefinitionCompletion(ctxWithTimeout, 1*time.Second, b.client, crd) + return waitForCustomResourceDefinitionCompletion(ctx, b.client, crd) } return nil } -func waitForCustomResourceDefinitionCompletion(ctx context.Context, interval time.Duration, client apiextclientv1beta1.CustomResourceDefinitionsGetter, crd *apiextv1beta1.CustomResourceDefinition) error { - return wait.PollImmediateUntil(interval, func() (bool, error) { +func waitForCustomResourceDefinitionCompletion(ctx context.Context, client apiextclientv1beta1.CustomResourceDefinitionsGetter, crd *apiextv1beta1.CustomResourceDefinition) error { + return wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) { c, err := client.CustomResourceDefinitions().Get(crd.Name, metav1.GetOptions{}) if errors.IsNotFound(err) { // exit early to recreate the crd. diff --git a/lib/resourcebuilder/apps.go b/lib/resourcebuilder/apps.go index 79afb9ffb..f64e8bca8 100644 --- a/lib/resourcebuilder/apps.go +++ b/lib/resourcebuilder/apps.go @@ -3,7 +3,6 @@ package resourcebuilder import ( "context" "fmt" - "time" "github.com/golang/glog" @@ -50,14 +49,12 @@ func (b *deploymentBuilder) Do(ctx context.Context) error { return err } if updated && actual.Generation > 1 { - ctxWithTimeout, cancel := context.WithTimeout(ctx, 5*time.Minute) - defer cancel() - return waitForDeploymentCompletion(ctxWithTimeout, 1*time.Second, b.client, deployment) + return waitForDeploymentCompletion(ctx, b.client, deployment) } return nil } -func waitForDeploymentCompletion(ctx context.Context, interval time.Duration, client appsclientv1.DeploymentsGetter, deployment *appsv1.Deployment) error { - return wait.PollImmediateUntil(interval, func() (bool, error) { +func waitForDeploymentCompletion(ctx context.Context, client appsclientv1.DeploymentsGetter, deployment *appsv1.Deployment) error { + return wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) { d, err := client.Deployments(deployment.Namespace).Get(deployment.Name, metav1.GetOptions{}) if errors.IsNotFound(err) { // exit early to recreate the deployment. @@ -114,20 +111,13 @@ func (b *daemonsetBuilder) Do(ctx context.Context) error { return err } if updated && actual.Generation > 1 { - ctxWithTimeout, cancel := context.WithTimeout(ctx, 5*time.Minute) - defer cancel() - return waitForDaemonsetRollout(ctxWithTimeout, 1*time.Second, b.client, daemonset) + return waitForDaemonsetRollout(ctx, b.client, daemonset) } return nil } -const ( - daemonsetPollInterval = 1 * time.Second - daemonsetPollTimeout = 5 * time.Minute -) - -func waitForDaemonsetRollout(ctx context.Context, interval time.Duration, client appsclientv1.DaemonSetsGetter, daemonset *appsv1.DaemonSet) error { - return wait.PollImmediateUntil(interval, func() (bool, error) { +func waitForDaemonsetRollout(ctx context.Context, client appsclientv1.DaemonSetsGetter, daemonset *appsv1.DaemonSet) error { + return wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) { d, err := client.DaemonSets(daemonset.Namespace).Get(daemonset.Name, metav1.GetOptions{}) if errors.IsNotFound(err) { // exit early to recreate the daemonset. diff --git a/lib/resourcebuilder/batch.go b/lib/resourcebuilder/batch.go index 843215caa..6737a1a15 100644 --- a/lib/resourcebuilder/batch.go +++ b/lib/resourcebuilder/batch.go @@ -3,7 +3,6 @@ package resourcebuilder import ( "context" "fmt" - "time" "github.com/golang/glog" @@ -39,7 +38,7 @@ func (b *jobBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { return b } -func (b *jobBuilder) Do(_ context.Context) error { +func (b *jobBuilder) Do(ctx context.Context) error { job := resourceread.ReadJobV1OrDie(b.raw) if b.modifier != nil { b.modifier(job) @@ -49,19 +48,14 @@ func (b *jobBuilder) Do(_ context.Context) error { return err } if updated { - return WaitForJobCompletion(b.client, job) + return WaitForJobCompletion(ctx, b.client, job) } return nil } -const ( - jobPollInterval = 1 * time.Second - jobPollTimeout = 5 * time.Minute -) - // WaitForJobCompletion waits for job to complete. -func WaitForJobCompletion(client batchclientv1.JobsGetter, job *batchv1.Job) error { - return wait.Poll(jobPollInterval, jobPollTimeout, func() (bool, error) { +func WaitForJobCompletion(ctx context.Context, client batchclientv1.JobsGetter, job *batchv1.Job) error { + return wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) { j, err := client.Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{}) if err != nil { glog.Errorf("error getting Job %s: %v", job.Name, err) @@ -84,5 +78,5 @@ func WaitForJobCompletion(client batchclientv1.JobsGetter, job *batchv1.Job) err return false, fmt.Errorf("deadline exceeded, reason: %q, message: %q", reason, message) } return false, nil - }) + }, ctx.Done()) } diff --git a/lib/resourcebuilder/interface.go b/lib/resourcebuilder/interface.go index a22c8a142..fb4598ee6 100644 --- a/lib/resourcebuilder/interface.go +++ b/lib/resourcebuilder/interface.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -86,3 +87,7 @@ func New(mapper *ResourceMapper, rest *rest.Config, m lib.Manifest) (Interface, } return f(rest, m), nil } + +// defaultObjectPollInterval is the default interval to poll the API to determine whether an object +// is ready. Use this when a more specific interval is not necessary. +const defaultObjectPollInterval = 3 * time.Second diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index e5325d0b3..1b416b0d8 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -198,9 +198,10 @@ func New( } // Run runs the cluster version operator until stopCh is completed. Workers is ignored for now. -func (optr *Operator) Run(workers int, stopCh <-chan struct{}) { +func (optr *Operator) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer optr.queue.ShutDown() + stopCh := ctx.Done() glog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval) defer glog.Info("Shutting down ClusterVersionOperator") @@ -215,7 +216,7 @@ func (optr *Operator) Run(workers int, stopCh <-chan struct{}) { // start the config sync loop, and have it notify the queue when new status is detected go runThrottledStatusNotifier(stopCh, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) }) - go optr.configSync.Start(16, stopCh) + go optr.configSync.Start(ctx, 16) go wait.Until(func() { optr.worker(optr.queue, optr.sync) }, time.Second, stopCh) go wait.Until(func() { optr.worker(optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second, stopCh) diff --git a/pkg/cvo/cvo_scenarios_test.go b/pkg/cvo/cvo_scenarios_test.go index 5d51359b8..503c2d43b 100644 --- a/pkg/cvo/cvo_scenarios_test.go +++ b/pkg/cvo/cvo_scenarios_test.go @@ -95,11 +95,13 @@ func setupCVOTest() (*Operator, map[string]runtime.Object, *fake.Clientset, *dyn func TestCVO_StartupAndSync(t *testing.T) { o, cvs, client, _, shutdownFn := setupCVOTest() - stopCh := make(chan struct{}) - defer close(stopCh) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + defer shutdownFn() worker := o.configSync.(*SyncWorker) - go worker.Start(1, stopCh) + go worker.Start(ctx, 1) // Step 1: Verify the CVO creates the initial Cluster Version object // @@ -328,8 +330,10 @@ func TestCVO_StartupAndSync(t *testing.T) { func TestCVO_RestartAndReconcile(t *testing.T) { o, cvs, client, _, shutdownFn := setupCVOTest() - stopCh := make(chan struct{}) - defer close(stopCh) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + defer shutdownFn() worker := o.configSync.(*SyncWorker) @@ -393,7 +397,7 @@ func TestCVO_RestartAndReconcile(t *testing.T) { // Step 2: Start the sync worker and verify the sequence of events, and then verify // the status does not change // - go worker.Start(1, stopCh) + go worker.Start(ctx, 1) // verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ @@ -487,8 +491,10 @@ func TestCVO_RestartAndReconcile(t *testing.T) { func TestCVO_ErrorDuringReconcile(t *testing.T) { o, cvs, client, _, shutdownFn := setupCVOTest() - stopCh := make(chan struct{}) - defer close(stopCh) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + defer shutdownFn() worker := o.configSync.(*SyncWorker) b := newBlockingResourceBuilder() @@ -549,7 +555,7 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { // Step 2: Start the sync worker and verify the sequence of events // - go worker.Start(1, stopCh) + go worker.Start(ctx, 1) // verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ @@ -616,7 +622,19 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { // Step 6: Send an error, then verify it shows up in status // b.Send(fmt.Errorf("unable to proceed")) + + go func() { + for len(b.ch) != 0 { + time.Sleep(time.Millisecond) + } + cancel() + for len(b.ch) == 0 || len(worker.StatusCh()) == 0 { + time.Sleep(time.Millisecond) + } + }() + // + // verify we see the update after the context times out verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ Reconciling: true, diff --git a/pkg/cvo/internal/operatorstatus.go b/pkg/cvo/internal/operatorstatus.go index 2f4dd8e63..f145e6692 100644 --- a/pkg/cvo/internal/operatorstatus.go +++ b/pkg/cvo/internal/operatorstatus.go @@ -99,13 +99,7 @@ func (b *clusterOperatorBuilder) Do(ctx context.Context) error { if b.modifier != nil { b.modifier(os) } - timeout := 1 * time.Minute - if b.mode == resourcebuilder.InitializingMode { - timeout = 6 * time.Minute - } - ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - return waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Second, b.client, os, b.mode) + return waitForOperatorStatusToBeDone(ctx, 1*time.Second, b.client, os, b.mode) } func waitForOperatorStatusToBeDone(ctx context.Context, interval time.Duration, client ClusterOperatorsGetter, expected *configv1.ClusterOperator, mode resourcebuilder.Mode) error { diff --git a/pkg/cvo/sync_test.go b/pkg/cvo/sync_test.go index 0d1022422..d7ddefe0b 100644 --- a/pkg/cvo/sync_test.go +++ b/pkg/cvo/sync_test.go @@ -15,6 +15,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/diff" dynamicfake "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/rest" clientgotesting "k8s.io/client-go/testing" @@ -29,8 +30,9 @@ import ( func Test_SyncWorker_apply(t *testing.T) { tests := []struct { - manifests []string - reactors map[action]error + manifests []string + reactors map[action]error + cancelAfter int check func(*testing.T, []action) wantErr bool @@ -61,10 +63,10 @@ func Test_SyncWorker_apply(t *testing.T) { } if got, exp := actions[0], (newAction(schema.GroupVersionKind{"test.cvo.io", "v1", "TestA"}, "default", "testa")); !reflect.DeepEqual(got, exp) { - t.Fatalf("expected: %s got: %s", spew.Sdump(exp), spew.Sdump(got)) + t.Fatalf("%s", diff.ObjectReflectDiff(exp, got)) } if got, exp := actions[1], (newAction(schema.GroupVersionKind{"test.cvo.io", "v1", "TestB"}, "default", "testb")); !reflect.DeepEqual(got, exp) { - t.Fatalf("expected: %s got: %s", spew.Sdump(exp), spew.Sdump(got)) + t.Fatalf("%s", diff.ObjectReflectDiff(exp, got)) } }, }, { @@ -89,7 +91,8 @@ func Test_SyncWorker_apply(t *testing.T) { reactors: map[action]error{ newAction(schema.GroupVersionKind{"test.cvo.io", "v1", "TestA"}, "default", "testa"): &meta.NoResourceMatchError{}, }, - wantErr: true, + cancelAfter: 2, + wantErr: true, check: func(t *testing.T, actions []action) { if len(actions) != 3 { spew.Dump(actions) @@ -97,7 +100,7 @@ func Test_SyncWorker_apply(t *testing.T) { } if got, exp := actions[0], (newAction(schema.GroupVersionKind{"test.cvo.io", "v1", "TestA"}, "default", "testa")); !reflect.DeepEqual(got, exp) { - t.Fatalf("expected: %s got: %s", spew.Sdump(exp), spew.Sdump(got)) + t.Fatalf("%s", diff.ObjectReflectDiff(exp, got)) } }, }} @@ -120,15 +123,40 @@ func Test_SyncWorker_apply(t *testing.T) { testMapper.AddToMap(resourcebuilder.Mapper) worker := &SyncWorker{} - worker.backoff.Steps = 3 worker.builder = NewResourceBuilder(nil, nil, nil) - ctx := context.Background() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + worker.builder = &cancelAfterErrorBuilder{ + builder: worker.builder, + cancel: cancel, + remainingErrors: test.cancelAfter, + } + worker.apply(ctx, up, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()}) test.check(t, r.actions) }) } } +type cancelAfterErrorBuilder struct { + builder payload.ResourceBuilder + cancel func() + remainingErrors int +} + +func (b *cancelAfterErrorBuilder) Apply(ctx context.Context, m *lib.Manifest, state payload.State) error { + err := b.builder.Apply(ctx, m, state) + if err != nil { + if b.remainingErrors == 0 { + b.cancel() + } else { + b.remainingErrors-- + } + } + return err +} + func Test_SyncWorker_apply_generic(t *testing.T) { tests := []struct { manifests []string @@ -357,7 +385,7 @@ func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus { return ch } -func (r *fakeSyncRecorder) Start(maxWorkers int, stopCh <-chan struct{}) {} +func (r *fakeSyncRecorder) Start(ctx context.Context, maxWorkers int) {} func (r *fakeSyncRecorder) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus { r.Updates = append(r.Updates, desired) diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index 8f7b571c8..96e0f165e 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -24,7 +24,7 @@ import ( // ConfigSyncWorker abstracts how the image is synchronized to the server. Introduced for testing. type ConfigSyncWorker interface { - Start(maxWorkers int, stopCh <-chan struct{}) + Start(ctx context.Context, maxWorkers int) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus StatusCh() <-chan SyncWorkerStatus } @@ -190,7 +190,7 @@ func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides // Start periodically invokes run, detecting whether content has changed. // It is edge-triggered when Update() is invoked and level-driven after the // syncOnce() has succeeded for a given input (we are said to be "reconciling"). -func (w *SyncWorker) Start(maxWorkers int, stopCh <-chan struct{}) { +func (w *SyncWorker) Start(ctx context.Context, maxWorkers int) { glog.V(5).Infof("Starting sync worker") work := &SyncWork{} @@ -203,7 +203,7 @@ func (w *SyncWorker) Start(maxWorkers int, stopCh <-chan struct{}) { for { waitingToReconcile := work.State == payload.ReconcilingPayload select { - case <-stopCh: + case <-ctx.Done(): glog.V(5).Infof("Stopped worker") return case <-next: @@ -229,7 +229,26 @@ func (w *SyncWorker) Start(maxWorkers int, stopCh <-chan struct{}) { // actually apply the image, allowing for calls to be cancelled err := func() error { - ctx, cancelFn := context.WithCancel(context.Background()) + + var syncTimeout time.Duration + switch work.State { + case payload.InitializingPayload: + // during initialization we expect things to fail due to ordering + // dependencies, so give it extra time + syncTimeout = w.minimumReconcileInterval * 5 + case payload.UpdatingPayload: + // during updates we want to flag failures on any resources that - + // for cluster operators that are not reporting failing the error + // message will point users to which operator is upgrading + syncTimeout = w.minimumReconcileInterval * 2 + default: + // TODO: make reconciling run in parallel, processing every resource + // once and accumulating errors, then reporting a summary of how + // much drift we found, and then we can turn down the timeout + syncTimeout = w.minimumReconcileInterval * 2 + } + ctx, cancelFn := context.WithTimeout(ctx, syncTimeout) + w.lock.Lock() w.cancelFn = cancelFn w.lock.Unlock() @@ -263,7 +282,7 @@ func (w *SyncWorker) Start(maxWorkers int, stopCh <-chan struct{}) { work.State = payload.ReconcilingPayload next = time.After(w.minimumReconcileInterval) } - }, 10*time.Millisecond, stopCh) + }, 10*time.Millisecond, ctx.Done()) glog.V(5).Infof("Worker shut down") } diff --git a/pkg/cvo/updatepayload.go b/pkg/cvo/updatepayload.go index 147222eeb..d39828c78 100644 --- a/pkg/cvo/updatepayload.go +++ b/pkg/cvo/updatepayload.go @@ -159,7 +159,7 @@ func (r *payloadRetriever) fetchUpdatePayloadToDir(ctx context.Context, dir stri if err != nil { return err } - return resourcebuilder.WaitForJobCompletion(r.kubeClient.BatchV1(), job) + return resourcebuilder.WaitForJobCompletion(ctx, r.kubeClient.BatchV1(), job) } // copyPayloadCmd returns command that copies cvo and release manifests from deafult location diff --git a/pkg/payload/task.go b/pkg/payload/task.go index c28a66abe..ae38983c7 100644 --- a/pkg/payload/task.go +++ b/pkg/payload/task.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -50,42 +51,49 @@ func (st *Task) String() string { return fmt.Sprintf("%s \"%s/%s\" (%d of %d)", strings.ToLower(st.Manifest.GVK.Kind), ns, st.Manifest.Object().GetName(), st.Index, st.Total) } +// Run attempts to create the provided object until it succeeds or context is cancelled. It returns the +// last error if context is cancelled. func (st *Task) Run(ctx context.Context, version string, builder ResourceBuilder, state State) error { var lastErr error - if err := wait.ExponentialBackoff(st.Backoff, func() (bool, error) { - // run builder for the manifest - if err := builder.Apply(ctx, st.Manifest, state); err != nil { - utilruntime.HandleError(errors.Wrapf(err, "error running apply for %s", st)) - lastErr = err - metricPayloadErrors.WithLabelValues(version).Inc() - if !shouldRequeueApplyOnErr(err) { - return false, err - } - return false, nil - } - return true, nil - }); err != nil { - if uerr, ok := lastErr.(*UpdateError); ok { - return uerr + backoff := st.Backoff + maxDuration := 15 * time.Second // TODO: fold back into Backoff in 1.13 + for { + // attempt the apply, waiting as long as necessary + err := builder.Apply(ctx, st.Manifest, state) + if err == nil { + return nil } - reason, cause := reasonForPayloadSyncError(lastErr) - if len(cause) > 0 { - cause = ": " + cause + + lastErr = err + utilruntime.HandleError(errors.Wrapf(err, "error running apply for %s", st)) + metricPayloadErrors.WithLabelValues(version).Inc() + + // TODO: this code will become easier in Kube 1.13 because Backoff now supports max + d := time.Duration(float64(backoff.Duration) * backoff.Factor) + if d > maxDuration { + d = maxDuration } - return &UpdateError{ - Nested: lastErr, - Reason: reason, - Message: fmt.Sprintf("Could not update %s%s", st, cause), + d = wait.Jitter(d, backoff.Jitter) + + // sleep or wait for cancellation + select { + case <-time.After(d): + continue + case <-ctx.Done(): + if uerr, ok := lastErr.(*UpdateError); ok { + return uerr + } + reason, cause := reasonForPayloadSyncError(lastErr) + if len(cause) > 0 { + cause = ": " + cause + } + return &UpdateError{ + Nested: lastErr, + Reason: reason, + Message: fmt.Sprintf("Could not update %s%s", st, cause), + } } } - return nil -} - -func shouldRequeueApplyOnErr(err error) bool { - if apierrors.IsInvalid(err) { - return false - } - return true } // UpdateError is a wrapper for errors that occur during a payload sync. diff --git a/pkg/start/start.go b/pkg/start/start.go index d103a516f..3bf37af58 100644 --- a/pkg/start/start.go +++ b/pkg/start/start.go @@ -161,7 +161,7 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc RetryPeriod: retryPeriod, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(localCtx context.Context) { - controllerCtx.Start(ctx.Done()) + controllerCtx.Start(ctx) select { case <-ctx.Done(): // WARNING: this is not completely safe until we have Kube 1.14 and ReleaseOnCancel @@ -339,11 +339,12 @@ func (o *Options) NewControllerContext(cb *ClientBuilder) *Context { // Start launches the controllers in the provided context and any supporting // infrastructure. When ch is closed the controllers will be shut down. -func (ctx *Context) Start(ch <-chan struct{}) { - go ctx.CVO.Run(2, ch) - if ctx.AutoUpdate != nil { - go ctx.AutoUpdate.Run(2, ch) +func (c *Context) Start(ctx context.Context) { + ch := ctx.Done() + go c.CVO.Run(ctx, 2) + if c.AutoUpdate != nil { + go c.AutoUpdate.Run(2, ch) } - ctx.CVInformerFactory.Start(ch) - ctx.InformerFactory.Start(ch) + c.CVInformerFactory.Start(ch) + c.InformerFactory.Start(ch) } diff --git a/pkg/start/start_integration_test.go b/pkg/start/start_integration_test.go index 44e1f50f7..b16082171 100644 --- a/pkg/start/start_integration_test.go +++ b/pkg/start/start_integration_test.go @@ -233,9 +233,9 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) { worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) controllers.CVO.SetSyncWorkerForTesting(worker) - stopCh := make(chan struct{}) - defer close(stopCh) - controllers.Start(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + controllers.Start(ctx) t.Logf("wait until we observe the cluster version become available") lastCV, err := waitForUpdateAvailable(t, client, ns, false, "0.0.1") @@ -379,14 +379,15 @@ func TestIntegrationCVO_initializeAndHandleError(t *testing.T) { options.ReleaseImage = payloadImage1 options.PayloadOverride = filepath.Join(dir, "ignored") options.EnableMetrics = false + options.ResyncInterval = 3 * time.Second controllers := options.NewControllerContext(cb) - worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) + worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Duration: time.Second, Factor: 1.2}).(*cvo.SyncWorker) controllers.CVO.SetSyncWorkerForTesting(worker) - stopCh := make(chan struct{}) - defer close(stopCh) - controllers.Start(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + controllers.Start(ctx) t.Logf("wait until we observe the cluster version become available") lastCV, err := waitForUpdateAvailable(t, client, ns, false, "0.0.1") @@ -660,9 +661,9 @@ metadata: worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) controllers.CVO.SetSyncWorkerForTesting(worker) - stopCh := make(chan struct{}) - defer close(stopCh) - controllers.Start(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + controllers.Start(ctx) t.Logf("wait until we observe the cluster version become available") lastCV, err := waitForUpdateAvailable(t, client, ns, false, "0.0.1")