Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
build:
hack/build-go.sh
.PHONY: build

test:
go test ./...
.PHONY: test
9 changes: 3 additions & 6 deletions lib/resourcebuilder/apiext.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package resourcebuilder

import (
"context"
"time"

"github.com/golang/glog"

Expand Down Expand Up @@ -49,15 +48,13 @@ func (b *crdBuilder) Do(ctx context.Context) error {
return err
}
if updated {
ctxWithTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
return waitForCustomResourceDefinitionCompletion(ctxWithTimeout, 1*time.Second, b.client, crd)
return waitForCustomResourceDefinitionCompletion(ctx, b.client, crd)
}
return nil
}

func waitForCustomResourceDefinitionCompletion(ctx context.Context, interval time.Duration, client apiextclientv1beta1.CustomResourceDefinitionsGetter, crd *apiextv1beta1.CustomResourceDefinition) error {
return wait.PollImmediateUntil(interval, func() (bool, error) {
func waitForCustomResourceDefinitionCompletion(ctx context.Context, client apiextclientv1beta1.CustomResourceDefinitionsGetter, crd *apiextv1beta1.CustomResourceDefinition) error {
return wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) {
c, err := client.CustomResourceDefinitions().Get(crd.Name, metav1.GetOptions{})
if errors.IsNotFound(err) {
// exit early to recreate the crd.
Expand Down
22 changes: 6 additions & 16 deletions lib/resourcebuilder/apps.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package resourcebuilder
import (
"context"
"fmt"
"time"

"github.com/golang/glog"

Expand Down Expand Up @@ -50,14 +49,12 @@ func (b *deploymentBuilder) Do(ctx context.Context) error {
return err
}
if updated && actual.Generation > 1 {
ctxWithTimeout, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
return waitForDeploymentCompletion(ctxWithTimeout, 1*time.Second, b.client, deployment)
return waitForDeploymentCompletion(ctx, b.client, deployment)
}
return nil
}
func waitForDeploymentCompletion(ctx context.Context, interval time.Duration, client appsclientv1.DeploymentsGetter, deployment *appsv1.Deployment) error {
return wait.PollImmediateUntil(interval, func() (bool, error) {
func waitForDeploymentCompletion(ctx context.Context, client appsclientv1.DeploymentsGetter, deployment *appsv1.Deployment) error {
return wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) {
d, err := client.Deployments(deployment.Namespace).Get(deployment.Name, metav1.GetOptions{})
if errors.IsNotFound(err) {
// exit early to recreate the deployment.
Expand Down Expand Up @@ -114,20 +111,13 @@ func (b *daemonsetBuilder) Do(ctx context.Context) error {
return err
}
if updated && actual.Generation > 1 {
ctxWithTimeout, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
return waitForDaemonsetRollout(ctxWithTimeout, 1*time.Second, b.client, daemonset)
return waitForDaemonsetRollout(ctx, b.client, daemonset)
}
return nil
}

const (
daemonsetPollInterval = 1 * time.Second
daemonsetPollTimeout = 5 * time.Minute
)

func waitForDaemonsetRollout(ctx context.Context, interval time.Duration, client appsclientv1.DaemonSetsGetter, daemonset *appsv1.DaemonSet) error {
return wait.PollImmediateUntil(interval, func() (bool, error) {
func waitForDaemonsetRollout(ctx context.Context, client appsclientv1.DaemonSetsGetter, daemonset *appsv1.DaemonSet) error {
return wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) {
d, err := client.DaemonSets(daemonset.Namespace).Get(daemonset.Name, metav1.GetOptions{})
if errors.IsNotFound(err) {
// exit early to recreate the daemonset.
Expand Down
16 changes: 5 additions & 11 deletions lib/resourcebuilder/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package resourcebuilder
import (
"context"
"fmt"
"time"

"github.com/golang/glog"

Expand Down Expand Up @@ -39,7 +38,7 @@ func (b *jobBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface {
return b
}

func (b *jobBuilder) Do(_ context.Context) error {
func (b *jobBuilder) Do(ctx context.Context) error {
job := resourceread.ReadJobV1OrDie(b.raw)
if b.modifier != nil {
b.modifier(job)
Expand All @@ -49,19 +48,14 @@ func (b *jobBuilder) Do(_ context.Context) error {
return err
}
if updated {
return WaitForJobCompletion(b.client, job)
return WaitForJobCompletion(ctx, b.client, job)
}
return nil
}

const (
jobPollInterval = 1 * time.Second
jobPollTimeout = 5 * time.Minute
)

// WaitForJobCompletion waits for job to complete.
func WaitForJobCompletion(client batchclientv1.JobsGetter, job *batchv1.Job) error {
return wait.Poll(jobPollInterval, jobPollTimeout, func() (bool, error) {
func WaitForJobCompletion(ctx context.Context, client batchclientv1.JobsGetter, job *batchv1.Job) error {
return wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) {
j, err := client.Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{})
if err != nil {
glog.Errorf("error getting Job %s: %v", job.Name, err)
Expand All @@ -84,5 +78,5 @@ func WaitForJobCompletion(client batchclientv1.JobsGetter, job *batchv1.Job) err
return false, fmt.Errorf("deadline exceeded, reason: %q, message: %q", reason, message)
}
return false, nil
})
}, ctx.Done())
}
5 changes: 5 additions & 0 deletions lib/resourcebuilder/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -86,3 +87,7 @@ func New(mapper *ResourceMapper, rest *rest.Config, m lib.Manifest) (Interface,
}
return f(rest, m), nil
}

// defaultObjectPollInterval is the default interval to poll the API to determine whether an object
// is ready. Use this when a more specific interval is not necessary.
const defaultObjectPollInterval = 3 * time.Second
5 changes: 3 additions & 2 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,10 @@ func New(
}

// Run runs the cluster version operator until stopCh is completed. Workers is ignored for now.
func (optr *Operator) Run(workers int, stopCh <-chan struct{}) {
func (optr *Operator) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer optr.queue.ShutDown()
stopCh := ctx.Done()

glog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval)
defer glog.Info("Shutting down ClusterVersionOperator")
Expand All @@ -215,7 +216,7 @@ func (optr *Operator) Run(workers int, stopCh <-chan struct{}) {

// start the config sync loop, and have it notify the queue when new status is detected
go runThrottledStatusNotifier(stopCh, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) })
go optr.configSync.Start(16, stopCh)
go optr.configSync.Start(ctx, 16)

go wait.Until(func() { optr.worker(optr.queue, optr.sync) }, time.Second, stopCh)
go wait.Until(func() { optr.worker(optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second, stopCh)
Expand Down
36 changes: 27 additions & 9 deletions pkg/cvo/cvo_scenarios_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,13 @@ func setupCVOTest() (*Operator, map[string]runtime.Object, *fake.Clientset, *dyn

func TestCVO_StartupAndSync(t *testing.T) {
o, cvs, client, _, shutdownFn := setupCVOTest()
stopCh := make(chan struct{})
defer close(stopCh)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

defer shutdownFn()
worker := o.configSync.(*SyncWorker)
go worker.Start(1, stopCh)
go worker.Start(ctx, 1)

// Step 1: Verify the CVO creates the initial Cluster Version object
//
Expand Down Expand Up @@ -328,8 +330,10 @@ func TestCVO_StartupAndSync(t *testing.T) {

func TestCVO_RestartAndReconcile(t *testing.T) {
o, cvs, client, _, shutdownFn := setupCVOTest()
stopCh := make(chan struct{})
defer close(stopCh)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

defer shutdownFn()
worker := o.configSync.(*SyncWorker)

Expand Down Expand Up @@ -393,7 +397,7 @@ func TestCVO_RestartAndReconcile(t *testing.T) {
// Step 2: Start the sync worker and verify the sequence of events, and then verify
// the status does not change
//
go worker.Start(1, stopCh)
go worker.Start(ctx, 1)
//
verifyAllStatus(t, worker.StatusCh(),
SyncWorkerStatus{
Expand Down Expand Up @@ -487,8 +491,10 @@ func TestCVO_RestartAndReconcile(t *testing.T) {

func TestCVO_ErrorDuringReconcile(t *testing.T) {
o, cvs, client, _, shutdownFn := setupCVOTest()
stopCh := make(chan struct{})
defer close(stopCh)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

defer shutdownFn()
worker := o.configSync.(*SyncWorker)
b := newBlockingResourceBuilder()
Expand Down Expand Up @@ -549,7 +555,7 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) {

// Step 2: Start the sync worker and verify the sequence of events
//
go worker.Start(1, stopCh)
go worker.Start(ctx, 1)
//
verifyAllStatus(t, worker.StatusCh(),
SyncWorkerStatus{
Expand Down Expand Up @@ -616,7 +622,19 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) {
// Step 6: Send an error, then verify it shows up in status
//
b.Send(fmt.Errorf("unable to proceed"))

go func() {
for len(b.ch) != 0 {
time.Sleep(time.Millisecond)
}
cancel()
for len(b.ch) == 0 || len(worker.StatusCh()) == 0 {
time.Sleep(time.Millisecond)
}
}()

//
// verify we see the update after the context times out
verifyAllStatus(t, worker.StatusCh(),
SyncWorkerStatus{
Reconciling: true,
Expand Down
8 changes: 1 addition & 7 deletions pkg/cvo/internal/operatorstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,7 @@ func (b *clusterOperatorBuilder) Do(ctx context.Context) error {
if b.modifier != nil {
b.modifier(os)
}
timeout := 1 * time.Minute
if b.mode == resourcebuilder.InitializingMode {
timeout = 6 * time.Minute
}
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Second, b.client, os, b.mode)
return waitForOperatorStatusToBeDone(ctx, 1*time.Second, b.client, os, b.mode)
}

func waitForOperatorStatusToBeDone(ctx context.Context, interval time.Duration, client ClusterOperatorsGetter, expected *configv1.ClusterOperator, mode resourcebuilder.Mode) error {
Expand Down
46 changes: 37 additions & 9 deletions pkg/cvo/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/rest"
clientgotesting "k8s.io/client-go/testing"
Expand All @@ -29,8 +30,9 @@ import (

func Test_SyncWorker_apply(t *testing.T) {
tests := []struct {
manifests []string
reactors map[action]error
manifests []string
reactors map[action]error
cancelAfter int

check func(*testing.T, []action)
wantErr bool
Expand Down Expand Up @@ -61,10 +63,10 @@ func Test_SyncWorker_apply(t *testing.T) {
}

if got, exp := actions[0], (newAction(schema.GroupVersionKind{"test.cvo.io", "v1", "TestA"}, "default", "testa")); !reflect.DeepEqual(got, exp) {
t.Fatalf("expected: %s got: %s", spew.Sdump(exp), spew.Sdump(got))
t.Fatalf("%s", diff.ObjectReflectDiff(exp, got))
}
if got, exp := actions[1], (newAction(schema.GroupVersionKind{"test.cvo.io", "v1", "TestB"}, "default", "testb")); !reflect.DeepEqual(got, exp) {
t.Fatalf("expected: %s got: %s", spew.Sdump(exp), spew.Sdump(got))
t.Fatalf("%s", diff.ObjectReflectDiff(exp, got))
}
},
}, {
Expand All @@ -89,15 +91,16 @@ func Test_SyncWorker_apply(t *testing.T) {
reactors: map[action]error{
newAction(schema.GroupVersionKind{"test.cvo.io", "v1", "TestA"}, "default", "testa"): &meta.NoResourceMatchError{},
},
wantErr: true,
cancelAfter: 2,
wantErr: true,
check: func(t *testing.T, actions []action) {
if len(actions) != 3 {
spew.Dump(actions)
t.Fatalf("unexpected %d actions", len(actions))
}

if got, exp := actions[0], (newAction(schema.GroupVersionKind{"test.cvo.io", "v1", "TestA"}, "default", "testa")); !reflect.DeepEqual(got, exp) {
t.Fatalf("expected: %s got: %s", spew.Sdump(exp), spew.Sdump(got))
t.Fatalf("%s", diff.ObjectReflectDiff(exp, got))
}
},
}}
Expand All @@ -120,15 +123,40 @@ func Test_SyncWorker_apply(t *testing.T) {
testMapper.AddToMap(resourcebuilder.Mapper)

worker := &SyncWorker{}
worker.backoff.Steps = 3
worker.builder = NewResourceBuilder(nil, nil, nil)
ctx := context.Background()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
worker.builder = &cancelAfterErrorBuilder{
builder: worker.builder,
cancel: cancel,
remainingErrors: test.cancelAfter,
}

worker.apply(ctx, up, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()})
test.check(t, r.actions)
})
}
}

type cancelAfterErrorBuilder struct {
builder payload.ResourceBuilder
cancel func()
remainingErrors int
}

func (b *cancelAfterErrorBuilder) Apply(ctx context.Context, m *lib.Manifest, state payload.State) error {
err := b.builder.Apply(ctx, m, state)
if err != nil {
if b.remainingErrors == 0 {
b.cancel()
} else {
b.remainingErrors--
}
}
return err
}

func Test_SyncWorker_apply_generic(t *testing.T) {
tests := []struct {
manifests []string
Expand Down Expand Up @@ -357,7 +385,7 @@ func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus {
return ch
}

func (r *fakeSyncRecorder) Start(maxWorkers int, stopCh <-chan struct{}) {}
func (r *fakeSyncRecorder) Start(ctx context.Context, maxWorkers int) {}

func (r *fakeSyncRecorder) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus {
r.Updates = append(r.Updates, desired)
Expand Down
Loading