diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index 0f8f0a7a6..b0d94a6d9 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -3,6 +3,7 @@ package cvo import ( "context" "fmt" + "math/rand" "reflect" "sort" "strings" @@ -48,7 +49,12 @@ type SyncWork struct { Desired configv1.Update Overrides []configv1.ComponentOverride State payload.State - Completed int + + // Completed is the number of times in a row we have synced this payload + Completed int + // Attempt is incremented each time we attempt to sync a payload and reset + // when we change Generation/Desired. + Attempt int } // Empty returns true if the image is empty for this work. @@ -340,6 +346,14 @@ func (w *SyncWorker) calculateNext(work *SyncWork) bool { work.Completed = 0 } + // track how many times we have tried the current payload in the current + // state + if changed || w.work.State != work.State { + work.Attempt = 0 + } else { + work.Attempt++ + } + if w.work != nil { work.Desired = w.work.Desired work.Overrides = w.work.Overrides @@ -405,7 +419,7 @@ func (w *SyncWorker) Status() *SyncWorkerStatus { // the update could not be completely applied. The status is updated as we progress. // Cancelling the context will abort the execution of the sync. func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers int, reporter StatusReporter) error { - glog.V(4).Infof("Running sync %s on generation %d in state %s", versionString(work.Desired), work.Generation, work.State) + glog.V(4).Infof("Running sync %s on generation %d in state %s at attempt %d", versionString(work.Desired), work.Generation, work.State, work.Attempt) update := work.Desired // cache the payload until the release image changes @@ -471,11 +485,23 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w } graph := payload.NewTaskGraph(tasks) graph.Split(payload.SplitOnJobs) - if work.State == payload.InitializingPayload { + switch work.State { + case payload.InitializingPayload: + // create every component in parallel to maximize reaching steady + // state graph.Parallelize(payload.FlattenByNumberAndComponent) - // get the payload out via brute force maxWorkers = len(graph.Nodes) - } else { + case payload.ReconcilingPayload: + // run the graph in random order during reconcile so that we don't + // hang on any particular component - we seed from the number of + // times we've attempted this particular payload, so a particular + // payload always syncs in a reproducible order + r := rand.New(rand.NewSource(int64(work.Attempt))) + graph.Parallelize(payload.PermuteOrder(payload.FlattenByNumberAndComponent, r)) + maxWorkers = 2 + default: + // perform an orderly roll out by payload order, using some parallelization + // but avoiding out of order creation so components have some base graph.Parallelize(payload.ByNumberAndComponent) } diff --git a/pkg/payload/task_graph.go b/pkg/payload/task_graph.go index 6449ffecf..256f759c3 100644 --- a/pkg/payload/task_graph.go +++ b/pkg/payload/task_graph.go @@ -3,6 +3,7 @@ package payload import ( "context" "fmt" + "math/rand" "regexp" "sort" "strconv" @@ -259,10 +260,28 @@ func (g *TaskGraph) Split(onFn func(task *Task) bool) { } } +// BreakFunc returns the input tasks in order of dependencies with +// explicit parallelizm allowed per task in an array of task nodes. +type BreakFunc func([]*Task) [][]*TaskNode + +// PermuteOrder returns a split function that ensures the order of +// each step is shuffled based on r. +func PermuteOrder(breakFn BreakFunc, r *rand.Rand) BreakFunc { + return func(tasks []*Task) [][]*TaskNode { + steps := breakFn(tasks) + for _, stepTasks := range steps { + r.Shuffle(len(stepTasks), func(i, j int) { + stepTasks[i], stepTasks[j] = stepTasks[j], stepTasks[i] + }) + } + return steps + } +} + // Parallelize takes the given breakFn and splits any TaskNode's tasks up // into parallel groups. If breakFn returns an empty array or a single // array item with a single task node, that is considered a no-op. -func (g *TaskGraph) Parallelize(breakFn func([]*Task) [][]*TaskNode) { +func (g *TaskGraph) Parallelize(breakFn BreakFunc) { for i := 0; i < len(g.Nodes); i++ { node := g.Nodes[i] results := breakFn(node.Tasks) @@ -403,6 +422,9 @@ type taskStatus struct { success bool } +// RunGraph executes the provided graph in order and in parallel up to maxParallelism. It will not start +// a new TaskNode until all of the prerequisites have completed. If fn returns an error, no dependencies +// of that node will be executed, but other indepedent edges will continue executing. func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func(ctx context.Context, tasks []*Task) error) []error { nestedCtx, cancelFn := context.WithCancel(ctx) defer cancelFn()