Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 50 additions & 15 deletions pkg/cvo/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,12 +580,15 @@ func (w *statusWrapper) ValidPayloadStatus(release configv1.Release) bool {
return equalDigest(w.previousStatus.loadPayloadStatus.Release.Image, release.Image)
}

// ReportPayload reports payload load status.
func (w *statusWrapper) ReportPayload(payloadStatus LoadPayloadStatus) {
status := w.previousStatus
status.loadPayloadStatus = payloadStatus
w.w.updateStatus(*status)
w.w.updateLoadStatus(*status)
}

// Report reports payload application status. It does not overwrite payload load status and capabilities status
// since payload application does not update these statuses they could therefore be out-of-date.
func (w *statusWrapper) Report(status SyncWorkerStatus) {
p := w.previousStatus
var fractionComplete float32
Expand All @@ -612,7 +615,7 @@ func (w *statusWrapper) Report(status SyncWorkerStatus) {
} else if status.Generation < p.Generation {
klog.Warningf("Received a Generation(%d) lower than previously known Generation(%d), this is most probably an internal error", status.Generation, p.Generation)
}
w.w.updateStatus(status)
w.w.updateApplyStatus(status)
}

// calculateNext updates the passed work object with the desired next state and
Expand Down Expand Up @@ -711,14 +714,48 @@ func equalSyncWork(a, b *SyncWork, context string) (equalVersion, equalOverrides
return sameVersion, sameOverrides, sameCapabilities
}

// updateStatus records the current status of the sync action for observation
// by others. It sends a copy of the update to the report channel for improved
// testability.
func (w *SyncWorker) updateStatus(update SyncWorkerStatus) {
// updateApplyStatus records the current status of the payload apply sync action for
// observation by others. It sends a copy of the update to the report channel for improved
// testability. It sets Generation, Failure, Done, Total, Completed, Reconciling, Initial,
// VersionHash, LastProgress, Actual, and Verified statuses which are manged by the payload
// apply sync action.
func (w *SyncWorker) updateApplyStatus(update SyncWorkerStatus) {
w.lock.Lock()
defer w.lock.Unlock()

klog.V(6).Infof("Status change %#v", update)
// do not overwrite these status values which are not managed by apply
update.loadPayloadStatus = w.status.loadPayloadStatus
update.CapabilitiesStatus = w.status.CapabilitiesStatus

klog.V(6).Infof("Payload apply status change %#v", update)
w.status = update
select {
case w.report <- update:
default:
if klog.V(6).Enabled() {
klog.Infof("Status report channel was full %#v", update)
}
}
}

// updateLoadStatus records the current status of the payload load sync action for
// observation by others. It sends a copy of the update to the report channel for improved
// testability. It sets Generation, Reconciling, Actual, Verified, payload load, and
// capabilities statuses which are manged by the payload load sync action.
func (w *SyncWorker) updateLoadStatus(update SyncWorkerStatus) {
w.lock.Lock()
defer w.lock.Unlock()

// do not overwrite these status values which are not managed by load
update.Failure = w.status.Failure
update.Done = w.status.Done
update.Total = w.status.Total
update.Completed = w.status.Completed
update.Initial = w.status.Initial
update.VersionHash = w.status.VersionHash
update.LastProgress = w.status.LastProgress

klog.V(6).Infof("Payload load status change %#v", update)
w.status = update
select {
case w.report <- update:
Expand Down Expand Up @@ -761,14 +798,12 @@ func (w *SyncWorker) apply(ctx context.Context, work *SyncWork, maxWorkers int,
total := len(payloadUpdate.Manifests)
cr := &consistentReporter{
status: SyncWorkerStatus{
Generation: work.Generation,
Initial: work.State.Initializing(),
Reconciling: work.State.Reconciling(),
VersionHash: payloadUpdate.ManifestHash,
Actual: payloadUpdate.Release,
Verified: payloadUpdate.VerifiedImage,
loadPayloadStatus: w.status.loadPayloadStatus,
CapabilitiesStatus: w.status.CapabilitiesStatus,
Generation: work.Generation,
Initial: work.State.Initializing(),
Reconciling: work.State.Reconciling(),
VersionHash: payloadUpdate.ManifestHash,
Actual: payloadUpdate.Release,
Verified: payloadUpdate.VerifiedImage,
},
completed: work.Completed,
version: payloadUpdate.Release.Version,
Expand Down