Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func (optr *Operator) InitializeFromPayload(restConfig *rest.Config, burstRestCo
Steps: 3,
},
optr.exclude,
optr.eventRecorder,
)

return nil
Expand Down
8 changes: 5 additions & 3 deletions pkg/cvo/cvo_scenarios_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/google/uuid"

"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
dynamicfake "k8s.io/client-go/dynamic/fake"
clientgotesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

configv1 "github.com/openshift/api/config/v1"
Expand Down Expand Up @@ -77,6 +77,7 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fak
client: client,
cvLister: &clientCVLister{client: client},
exclude: "exclude-test",
eventRecorder: record.NewFakeRecorder(100),
}

dynamicScheme := runtime.NewScheme()
Expand All @@ -92,6 +93,7 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fak
Steps: 1,
},
"exclude-test",
record.NewFakeRecorder(100),
)
o.configSync = worker

Expand Down
4 changes: 4 additions & 0 deletions pkg/cvo/cvo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
kfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
ktesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"

Expand Down Expand Up @@ -2259,6 +2260,7 @@ func TestOperator_sync(t *testing.T) {
}
optr.configSync = &fakeSyncRecorder{Returns: expectStatus}
}
optr.eventRecorder = record.NewFakeRecorder(100)

err := optr.sync(optr.queueKey())
if err != nil && tt.wantErr == nil {
Expand Down Expand Up @@ -2626,6 +2628,7 @@ func TestOperator_availableUpdatesSync(t *testing.T) {
optr.proxyLister = &clientProxyLister{client: optr.client}
optr.coLister = &clientCOLister{client: optr.client}
optr.cvLister = &clientCVLister{client: optr.client}
optr.eventRecorder = record.NewFakeRecorder(100)

if tt.handler != nil {
s := httptest.NewServer(http.HandlerFunc(tt.handler))
Expand Down Expand Up @@ -3129,6 +3132,7 @@ func TestOperator_upgradeableSync(t *testing.T) {
optr.coLister = &clientCOLister{client: optr.client}
optr.cvLister = &clientCVLister{client: optr.client}
optr.upgradeableChecks = optr.defaultUpgradeableChecks()
optr.eventRecorder = record.NewFakeRecorder(100)

err := optr.upgradeableSync(optr.queueKey())
if err != nil && tt.wantErr == nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/cvo/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
dto "github.com/prometheus/client_model/go"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"

configv1 "github.com/openshift/api/config/v1"
)
Expand Down Expand Up @@ -512,6 +513,7 @@ func Test_operatorMetrics_Collect(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.optr.eventRecorder = record.NewFakeRecorder(100)
if tt.optr.cvLister == nil {
tt.optr.cvLister = &cvLister{}
}
Expand Down Expand Up @@ -588,7 +590,8 @@ func Test_operatorMetrics_CollectTransitions(t *testing.T) {
},
},
optr: &Operator{
coLister: &coLister{},
coLister: &coLister{},
eventRecorder: record.NewFakeRecorder(100),
},
wants: func(t *testing.T, metrics []prometheus.Metric) {
if len(metrics) != 5 {
Expand Down
2 changes: 2 additions & 0 deletions pkg/cvo/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/client-go/tools/record"

configv1 "github.com/openshift/api/config/v1"
"github.com/openshift/client-go/config/clientset/versioned/fake"
Expand Down Expand Up @@ -184,6 +185,7 @@ func TestOperator_syncFailingStatus(t *testing.T) {
},
},
),
eventRecorder: record.NewFakeRecorder(100),
},
wantErr: func(t *testing.T, err error) {
if err == nil || err.Error() != "bad" {
Expand Down
6 changes: 4 additions & 2 deletions pkg/cvo/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync"
"testing"

"k8s.io/client-go/tools/record"

"github.com/davecgh/go-spew/spew"

"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -125,7 +127,7 @@ func Test_SyncWorker_apply(t *testing.T) {
testMapper.RegisterGVK(schema.GroupVersionKind{"test.cvo.io", "v1", "TestB"}, newTestBuilder(r, test.reactors))
testMapper.AddToMap(resourcebuilder.Mapper)

worker := &SyncWorker{}
worker := &SyncWorker{eventRecorder: record.NewFakeRecorder(100)}
worker.builder = NewResourceBuilder(nil, nil, nil)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -311,7 +313,7 @@ func Test_SyncWorker_apply_generic(t *testing.T) {
dynamicClient := dynamicfake.NewSimpleDynamicClient(dynamicScheme)

up := &payload.Update{ReleaseImage: "test", ReleaseVersion: "v0.0.0", Manifests: manifests}
worker := &SyncWorker{}
worker := &SyncWorker{eventRecorder: record.NewFakeRecorder(100)}
worker.backoff.Steps = 1
worker.builder = &testResourceBuilder{
client: dynamicClient,
Expand Down
28 changes: 20 additions & 8 deletions pkg/cvo/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import (
configlistersv1 "github.com/openshift/client-go/config/listers/config/v1"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
"k8s.io/klog"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/klog"

configv1 "github.com/openshift/api/config/v1"

Expand Down Expand Up @@ -131,6 +132,7 @@ type SyncWorker struct {
retriever PayloadRetriever
builder payload.ResourceBuilder
preconditions precondition.List
eventRecorder record.EventRecorder

// minimumReconcileInterval is the minimum time between reconcile attempts, and is
// used to define the maximum backoff interval when syncOnce() returns an error.
Expand All @@ -157,11 +159,12 @@ type SyncWorker struct {

// NewSyncWorker initializes a ConfigSyncWorker that will retrieve payloads to disk, apply them via builder
// to a server, and obey limits about how often to reconcile or retry on errors.
func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder, reconcileInterval time.Duration, backoff wait.Backoff, exclude string) *SyncWorker {
func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder, reconcileInterval time.Duration, backoff wait.Backoff, exclude string, eventRecorder record.EventRecorder) *SyncWorker {
return &SyncWorker{
retriever: retriever,
builder: builder,
backoff: backoff,
retriever: retriever,
builder: builder,
backoff: backoff,
eventRecorder: eventRecorder,

minimumReconcileInterval: reconcileInterval,

Expand All @@ -178,8 +181,8 @@ func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder,
// NewSyncWorkerWithPreconditions initializes a ConfigSyncWorker that will retrieve payloads to disk, apply them via builder
// to a server, and obey limits about how often to reconcile or retry on errors.
// It allows providing preconditions for loading payload.
func NewSyncWorkerWithPreconditions(retriever PayloadRetriever, builder payload.ResourceBuilder, preconditions precondition.List, reconcileInterval time.Duration, backoff wait.Backoff, exclude string) *SyncWorker {
worker := NewSyncWorker(retriever, builder, reconcileInterval, backoff, exclude)
func NewSyncWorkerWithPreconditions(retriever PayloadRetriever, builder payload.ResourceBuilder, preconditions precondition.List, reconcileInterval time.Duration, backoff wait.Backoff, exclude string, eventRecorder record.EventRecorder) *SyncWorker {
worker := NewSyncWorker(retriever, builder, reconcileInterval, backoff, exclude, eventRecorder)
worker.preconditions = preconditions
return worker
}
Expand Down Expand Up @@ -479,6 +482,8 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
validPayload := w.payload
if validPayload == nil || !equalUpdate(configv1.Update{Image: validPayload.ReleaseImage}, configv1.Update{Image: update.Image}) {
klog.V(4).Infof("Loading payload")
cvoObjectRef := &corev1.ObjectReference{APIVersion: "config.openshift.io/v1", Kind: "ClusterVersion", Name: "version", Namespace: "openshift-cluster-version"}
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "RetrievePayload", "retrieving payload version=%q image=%q", update.Version, update.Image)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to have the version and image as structured data on the event. Is that possible, or are flat strings all we have available?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to have the version and image as structured data on the event. Is that possible, or are flat strings all we have available?

To my knowledge, flat strings are what you have.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically we could do that as annotation, but let's not for now.

reporter.Report(SyncWorkerStatus{
Generation: work.Generation,
Step: "RetrievePayload",
Expand All @@ -488,6 +493,7 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
})
info, err := w.retriever.RetrievePayload(ctx, update)
if err != nil {
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "RetrievePayloadFailed", "retrieving payload failed version=%q image=%q failure=%v", update.Version, update.Image, err)
reporter.Report(SyncWorkerStatus{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we could move the event recorder under reporter.Report?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we could move the event recorder under reporter.Report?

that would be a significant change. It's used for all error calls and I'm not certain of the fanout that would cause. I'd like to solve the immediate "special" events and allow someone closer to the operator decide if and how to emit an event for each report.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let's keep reporter separate for now.

Generation: work.Generation,
Failure: err,
Expand All @@ -499,8 +505,10 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
return err
}

w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "VerifyPayload", "verifying payload version=%q image=%q", update.Version, update.Image)
payloadUpdate, err := payload.LoadUpdate(info.Directory, update.Image, w.exclude)
if err != nil {
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "VerifyPayloadFailed", "verifying payload failed version=%q image=%q failure=%v", update.Version, update.Image, err)
reporter.Report(SyncWorkerStatus{
Generation: work.Generation,
Failure: err,
Expand Down Expand Up @@ -532,7 +540,9 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
if err := precondition.Summarize(w.preconditions.RunAll(ctx, precondition.ReleaseContext{DesiredVersion: payloadUpdate.ReleaseVersion}, clusterVersion)); err != nil {
if update.Force {
klog.V(4).Infof("Forcing past precondition failures: %s", err)
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "PreconditionsForced", "preconditions forced for payload loaded version=%q image=%q failures=%v", update.Version, update.Image, err)
} else {
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "PreconditionsFailed", "preconditions failed for payload loaded version=%q image=%q failures=%v", update.Version, update.Image, err)
reporter.Report(SyncWorkerStatus{
Generation: work.Generation,
Failure: err,
Expand All @@ -545,9 +555,11 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
return err
}
}
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "PreconditionsPassed", "preconditions passed for payload loaded version=%q image=%q", update.Version, update.Image)
}

w.payload = payloadUpdate
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "PayloadLoaded", "payload loaded version=%q image=%q", update.Version, update.Image)
klog.V(4).Infof("Payload loaded from %s with hash %s", payloadUpdate.ReleaseImage, payloadUpdate.ManifestHash)
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/cvo/sync_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"testing"
"time"

"k8s.io/client-go/tools/record"

configv1 "github.com/openshift/api/config/v1"
)

Expand Down Expand Up @@ -77,7 +79,7 @@ func Test_statusWrapper_ReportProgress(t *testing.T) {
w := &statusWrapper{
previousStatus: &tt.previous,
}
w.w = &SyncWorker{report: make(chan SyncWorkerStatus, 1)}
w.w = &SyncWorker{report: make(chan SyncWorkerStatus, 1), eventRecorder: record.NewFakeRecorder(100)}
w.Report(tt.next)
close(w.w.report)
if tt.want {
Expand Down Expand Up @@ -130,7 +132,7 @@ func Test_statusWrapper_ReportGeneration(t *testing.T) {
w := &statusWrapper{
previousStatus: &tt.previous,
}
w.w = &SyncWorker{report: make(chan SyncWorkerStatus, 1)}
w.w = &SyncWorker{report: make(chan SyncWorkerStatus, 1), eventRecorder: record.NewFakeRecorder(100)}
w.Report(tt.next)
close(w.w.report)

Expand Down
9 changes: 5 additions & 4 deletions pkg/start/start_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
randutil "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog"

configv1 "github.com/openshift/api/config/v1"
Expand Down Expand Up @@ -240,7 +241,7 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) {
options.PayloadOverride = filepath.Join(dir, "ignored")
controllers := options.NewControllerContext(cb)

worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "")
worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", record.NewFakeRecorder(100))
controllers.CVO.SetSyncWorkerForTesting(worker)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -392,7 +393,7 @@ func TestIntegrationCVO_initializeAndHandleError(t *testing.T) {
options.ResyncInterval = 3 * time.Second
controllers := options.NewControllerContext(cb)

worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Duration: time.Second, Factor: 1.2}, "")
worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Duration: time.Second, Factor: 1.2}, "", record.NewFakeRecorder(100))
controllers.CVO.SetSyncWorkerForTesting(worker)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -497,7 +498,7 @@ func TestIntegrationCVO_gracefulStepDown(t *testing.T) {
options.NodeName = "test-node"
controllers := options.NewControllerContext(cb)

worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "")
worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", record.NewFakeRecorder(100))
controllers.CVO.SetSyncWorkerForTesting(worker)

lock, err := createResourceLock(cb, ns, ns)
Expand Down Expand Up @@ -669,7 +670,7 @@ metadata:
t.Fatal(err)
}

worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "")
worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", record.NewFakeRecorder(100))
controllers.CVO.SetSyncWorkerForTesting(worker)

ctx, cancel := context.WithCancel(context.Background())
Expand Down