Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
status: Report the operators that have not yet deployed
The current single error return strategy from the CVO sync loop predates
parallel payload execution and limited retries. Instead, collect all
errors from the task execution graph and attempt to synthesize better
messages that describe what is actually happening.

1. Filter out cancellation error messages - they aren't useful and are
   a normal part of execution
2. When multiple errors are reported, display a reasonable multi-line
   error that summarizes any blockers
3. Treat ClusterOperatorNotAvailable as a special case - if all errors
   reported are of that type convert it to ClusterOperatorsNotAvailable
   and synthesize a better message
4. In the sync loop, if we are still making progress towards the update
   goal and we haven't waited too long for an update, and if the error
   is the specific cluster operator not available types, display the
   condition Progressing=True instead of Failing=true with a synthetic
   message.

This also passes along the task with the UpdateError so that we can do
more selective error messages for specific error cases.
  • Loading branch information
smarterclayton committed Apr 9, 2019
commit c2ac20fa172da73a873d3e3728053a30af9cf4dc
324 changes: 262 additions & 62 deletions pkg/cvo/cvo_scenarios_test.go

Large diffs are not rendered by default.

36 changes: 33 additions & 3 deletions pkg/cvo/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"time"

"github.com/golang/glog"

Expand Down Expand Up @@ -156,6 +157,8 @@ func (optr *Operator) syncStatus(original, config *configv1.ClusterVersion, stat
now := metav1.Now()
version := versionString(status.Actual)

mergeOperatorHistory(config, status.Actual, now, status.Completed > 0)

// update validation errors
var reason string
if len(validationErrs) > 0 {
Expand Down Expand Up @@ -200,7 +203,9 @@ func (optr *Operator) syncStatus(original, config *configv1.ClusterVersion, stat
})
}

if err := status.Failure; err != nil {
progressReason, progressShortMessage, skipFailure := convertErrorToProgressing(config.Status.History, now.Time, status)

if err := status.Failure; err != nil && !skipFailure {
var reason string
msg := "an error occurred"
if uErr, ok := err.(*payload.UpdateError); ok {
Expand Down Expand Up @@ -258,13 +263,19 @@ func (optr *Operator) syncStatus(original, config *configv1.ClusterVersion, stat
switch {
case len(validationErrs) > 0:
message = fmt.Sprintf("Reconciling %s: the cluster version is invalid", version)
case status.Fraction > 0 && skipFailure:
reason = progressReason
message = fmt.Sprintf("Working towards %s: %.0f%% complete, %s", version, status.Fraction*100, progressShortMessage)
case status.Fraction > 0:
message = fmt.Sprintf("Working towards %s: %.0f%% complete", version, status.Fraction*100)
case status.Step == "RetrievePayload":
if len(reason) == 0 {
reason = "DownloadingUpdate"
}
message = fmt.Sprintf("Working towards %s: downloading update", version)
case skipFailure:
reason = progressReason
message = fmt.Sprintf("Working towards %s: %s", version, progressShortMessage)
default:
message = fmt.Sprintf("Working towards %s", version)
}
Expand All @@ -287,8 +298,6 @@ func (optr *Operator) syncStatus(original, config *configv1.ClusterVersion, stat
})
}

mergeOperatorHistory(config, status.Actual, now, status.Completed > 0)

if glog.V(6) {
glog.Infof("Apply config: %s", diff.ObjectReflectDiff(original, config))
}
Expand All @@ -297,6 +306,27 @@ func (optr *Operator) syncStatus(original, config *configv1.ClusterVersion, stat
return err
}

// convertErrorToProgressing returns true if the provided status indicates a failure condition can be interpreted as
// still making internal progress. The general error we try to suppress is an operator or operators still being
// unavailable AND the general payload task making progress towards its goal. An operator is given 10 minutes since
// its last update to go ready, or an hour has elapsed since the update began, before the condition is ignored.
func convertErrorToProgressing(history []configv1.UpdateHistory, now time.Time, status *SyncWorkerStatus) (reason string, message string, ok bool) {
if len(history) == 0 || status.Failure == nil || status.Reconciling || status.LastProgress.IsZero() {
return "", "", false
}
if now.Sub(status.LastProgress) > 10*time.Minute || now.Sub(history[0].StartedTime.Time) > time.Hour {
return "", "", false
}
uErr, ok := status.Failure.(*payload.UpdateError)
if !ok {
return "", "", false
}
if uErr.Reason == "ClusterOperatorNotAvailable" || uErr.Reason == "ClusterOperatorsNotAvailable" {
return uErr.Reason, fmt.Sprintf("waiting on %s", uErr.Name), true
}
return "", "", false
}

// syncDegradedStatus handles generic errors in the cluster version. It tries to preserve
// all status fields that it can by using the provided config or loading the latest version
// from the cache (instead of clearing the status).
Expand Down
171 changes: 166 additions & 5 deletions pkg/cvo/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"
"reflect"
"sort"
"strings"
"sync"
"time"

Expand All @@ -12,6 +14,7 @@ import (
"golang.org/x/time/rate"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"

Expand Down Expand Up @@ -67,6 +70,8 @@ type SyncWorkerStatus struct {
Initial bool
VersionHash string

LastProgress time.Time

Actual configv1.Update
}

Expand Down Expand Up @@ -304,6 +309,9 @@ func (w *statusWrapper) Report(status SyncWorkerStatus) {
}
}
}
if status.Fraction > p.Fraction || status.Completed > p.Completed || (status.Failure == nil && status.Actual != p.Actual) {
status.LastProgress = time.Now()
}
w.w.updateStatus(status)
}

Expand Down Expand Up @@ -471,7 +479,7 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
}

// update each object
err := payload.RunGraph(ctx, graph, maxWorkers, func(ctx context.Context, tasks []*payload.Task) error {
errs := payload.RunGraph(ctx, graph, maxWorkers, func(ctx context.Context, tasks []*payload.Task) error {
for _, task := range tasks {
if contextIsCancelled(ctx) {
return cr.CancelError()
Expand All @@ -495,8 +503,8 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
}
return nil
})
if err != nil {
cr.Error(err)
if len(errs) > 0 {
err := cr.Errors(errs)
return err
}

Expand All @@ -518,6 +526,12 @@ func init() {
)
}

type errCanceled struct {
err error
}

func (e errCanceled) Error() string { return e.err.Error() }

// consistentReporter hides the details of calculating the status based on the progress
// of the graph runner.
type consistentReporter struct {
Expand Down Expand Up @@ -553,14 +567,31 @@ func (r *consistentReporter) Error(err error) {
copied := r.status
copied.Step = "ApplyResources"
copied.Fraction = float32(r.done) / float32(r.total)
copied.Failure = err
if !isCancelledError(err) {
copied.Failure = err
}
r.reporter.Report(copied)
}

func (r *consistentReporter) Errors(errs []error) error {
err := summarizeTaskGraphErrors(errs)

r.lock.Lock()
defer r.lock.Unlock()
copied := r.status
copied.Step = "ApplyResources"
copied.Fraction = float32(r.done) / float32(r.total)
if err != nil {
copied.Failure = err
}
r.reporter.Report(copied)
return err
}

func (r *consistentReporter) CancelError() error {
r.lock.Lock()
defer r.lock.Unlock()
return fmt.Errorf("update was cancelled at %d/%d", r.done, r.total)
return errCanceled{fmt.Errorf("update was cancelled at %d/%d", r.done, r.total)}
}

func (r *consistentReporter) Complete() {
Expand All @@ -576,6 +607,136 @@ func (r *consistentReporter) Complete() {
r.reporter.Report(copied)
}

func isCancelledError(err error) bool {
if err == nil {
return false
}
_, ok := err.(errCanceled)
return ok
}

// summarizeTaskGraphErrors takes a set of errors returned by the execution of a graph and attempts
// to reduce them to a single cause or message. This is domain specific to the payload and our update
// algorithms. The return value is the summarized error which may be nil if provided conditions are
// not truly an error (cancellation).
// TODO: take into account install vs upgrade
func summarizeTaskGraphErrors(errs []error) error {
// we ignore cancellation errors since they don't provide good feedback to users and are an internal
// detail of the server
err := errors.FilterOut(errors.NewAggregate(errs), isCancelledError)
if err == nil {
glog.V(4).Infof("All errors were cancellation errors: %v", errs)
return nil
}
agg, ok := err.(errors.Aggregate)
if !ok {
errs = []error{err}
} else {
errs = agg.Errors()
}

// log the errors to assist in debugging future summarization
if glog.V(4) {
glog.Infof("Summarizing %d errors", len(errs))
for _, err := range errs {
if uErr, ok := err.(*payload.UpdateError); ok {
if uErr.Task != nil {
glog.Infof("Update error %d/%d: %s %s (%T: %v)", uErr.Task.Index, uErr.Task.Total, uErr.Reason, uErr.Message, uErr.Nested, uErr.Nested)
} else {
glog.Infof("Update error: %s %s (%T: %v)", uErr.Reason, uErr.Message, uErr.Nested, uErr.Nested)
}
} else {
glog.Infof("Update error: %T: %v", err, err)
}
}
}

// collapse into a set of common errors where necessary
if len(errs) == 1 {
return errs[0]
}
if err := newClusterOperatorsNotAvailable(errs); err != nil {
return err
}
return newMultipleError(errs)
}

// newClusterOperatorsNotAvailable unifies multiple ClusterOperatorNotAvailable errors into
// a single error. It returns nil if the provided errors are not of the same type.
func newClusterOperatorsNotAvailable(errs []error) error {
names := make([]string, 0, len(errs))
for _, err := range errs {
uErr, ok := err.(*payload.UpdateError)
if !ok || uErr.Reason != "ClusterOperatorNotAvailable" {
return nil
}
if len(uErr.Name) > 0 {
names = append(names, uErr.Name)
}
}
if len(names) == 0 {
return nil
}

nested := make([]error, 0, len(errs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ques: why create new nested error list?

for _, err := range errs {
nested = append(nested, err)
}
sort.Strings(names)
name := strings.Join(names, ", ")
return &payload.UpdateError{
Nested: errors.NewAggregate(errs),
Reason: "ClusterOperatorsNotAvailable",
Message: fmt.Sprintf("Some cluster operators are still updating: %s", name),
Name: name,
}
}

// uniqueStrings returns an array with all sequential identical items removed. It modifies the contents
// of arr. Sort the input array before calling to remove all duplicates.
func uniqueStrings(arr []string) []string {
var last int
for i := 1; i < len(arr); i++ {
if arr[i] == arr[last] {
continue
}
last++
if last != i {
arr[last] = arr[i]
}
}
if last < len(arr) {
last++
}
return arr[:last]
}

// newMultipleError reports a generic set of errors that block progress. This method expects multiple
// errors but handles singular and empty arrays gracefully. If all errors have the same message, the
// first item is returned.
func newMultipleError(errs []error) error {
if len(errs) == 0 {
return nil
}
if len(errs) == 1 {
return errs[0]
}
messages := make([]string, 0, len(errs))
for _, err := range errs {
messages = append(messages, err.Error())
}
sort.Strings(messages)
messages = uniqueStrings(messages)
if len(messages) == 0 {
return errs[0]
}
return &payload.UpdateError{
Nested: errors.NewAggregate(errs),
Reason: "MultipleErrors",
Message: fmt.Sprintf("Multiple errors are preventing progress:\n* %s", strings.Join(messages, "\n* ")),
}
}

// getOverrideForManifest returns the override and true when override exists for manifest.
func getOverrideForManifest(overrides []configv1.ComponentOverride, manifest *lib.Manifest) (configv1.ComponentOverride, bool) {
for idx, ov := range overrides {
Expand Down
Loading