Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions pkg/cvo/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cvo
import (
"context"
"fmt"
"math/rand"
"reflect"
"sort"
"strings"
Expand Down Expand Up @@ -48,7 +49,12 @@ type SyncWork struct {
Desired configv1.Update
Overrides []configv1.ComponentOverride
State payload.State
Completed int

// Completed is the number of times in a row we have synced this payload
Completed int
// Attempt is incremented each time we attempt to sync a payload and reset
// when we change Generation/Desired.
Attempt int
}

// Empty returns true if the image is empty for this work.
Expand Down Expand Up @@ -340,6 +346,14 @@ func (w *SyncWorker) calculateNext(work *SyncWork) bool {
work.Completed = 0
}

// track how many times we have tried the current payload in the current
// state
if changed || w.work.State != work.State {
work.Attempt = 0
} else {
work.Attempt++
}

if w.work != nil {
work.Desired = w.work.Desired
work.Overrides = w.work.Overrides
Expand Down Expand Up @@ -405,7 +419,7 @@ func (w *SyncWorker) Status() *SyncWorkerStatus {
// the update could not be completely applied. The status is updated as we progress.
// Cancelling the context will abort the execution of the sync.
func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers int, reporter StatusReporter) error {
glog.V(4).Infof("Running sync %s on generation %d in state %s", versionString(work.Desired), work.Generation, work.State)
glog.V(4).Infof("Running sync %s on generation %d in state %s at attempt %d", versionString(work.Desired), work.Generation, work.State, work.Attempt)
update := work.Desired

// cache the payload until the release image changes
Expand Down Expand Up @@ -471,11 +485,23 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
}
graph := payload.NewTaskGraph(tasks)
graph.Split(payload.SplitOnJobs)
if work.State == payload.InitializingPayload {
switch work.State {
case payload.InitializingPayload:
// create every component in parallel to maximize reaching steady
// state
graph.Parallelize(payload.FlattenByNumberAndComponent)
// get the payload out via brute force
maxWorkers = len(graph.Nodes)
} else {
case payload.ReconcilingPayload:
// run the graph in random order during reconcile so that we don't
// hang on any particular component - we seed from the number of
// times we've attempted this particular payload, so a particular
// payload always syncs in a reproducible order
r := rand.New(rand.NewSource(int64(work.Attempt)))
graph.Parallelize(payload.PermuteOrder(payload.FlattenByNumberAndComponent, r))
maxWorkers = 2
default:
// perform an orderly roll out by payload order, using some parallelization
// but avoiding out of order creation so components have some base
graph.Parallelize(payload.ByNumberAndComponent)
}

Expand Down
24 changes: 23 additions & 1 deletion pkg/payload/task_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package payload
import (
"context"
"fmt"
"math/rand"
"regexp"
"sort"
"strconv"
Expand Down Expand Up @@ -259,10 +260,28 @@ func (g *TaskGraph) Split(onFn func(task *Task) bool) {
}
}

// BreakFunc returns the input tasks in order of dependencies with
// explicit parallelizm allowed per task in an array of task nodes.
type BreakFunc func([]*Task) [][]*TaskNode

// PermuteOrder returns a split function that ensures the order of
// each step is shuffled based on r.
func PermuteOrder(breakFn BreakFunc, r *rand.Rand) BreakFunc {
return func(tasks []*Task) [][]*TaskNode {
steps := breakFn(tasks)
for _, stepTasks := range steps {
r.Shuffle(len(stepTasks), func(i, j int) {
stepTasks[i], stepTasks[j] = stepTasks[j], stepTasks[i]
})
}
return steps
}
}

// Parallelize takes the given breakFn and splits any TaskNode's tasks up
// into parallel groups. If breakFn returns an empty array or a single
// array item with a single task node, that is considered a no-op.
func (g *TaskGraph) Parallelize(breakFn func([]*Task) [][]*TaskNode) {
func (g *TaskGraph) Parallelize(breakFn BreakFunc) {
for i := 0; i < len(g.Nodes); i++ {
node := g.Nodes[i]
results := breakFn(node.Tasks)
Expand Down Expand Up @@ -403,6 +422,9 @@ type taskStatus struct {
success bool
}

// RunGraph executes the provided graph in order and in parallel up to maxParallelism. It will not start
// a new TaskNode until all of the prerequisites have completed. If fn returns an error, no dependencies
// of that node will be executed, but other indepedent edges will continue executing.
func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func(ctx context.Context, tasks []*Task) error) []error {
nestedCtx, cancelFn := context.WithCancel(ctx)
defer cancelFn()
Expand Down