Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ const (
// the strategy whose artifacts are being deleted
AnnotationKeyArtifactGCStrategy = workflow.WorkflowFullName + "/artifact-gc-strategy"

// AnnotationKeyLastSeenVersion stores the last seen version of the workflow when it was last successfully processed by the controller
AnnotationKeyLastSeenVersion = workflow.WorkflowFullName + "/last-seen-version"

// LabelParallelismLimit is a label applied on namespace objects to control the per namespace parallelism.
LabelParallelismLimit = workflow.WorkflowFullName + "/parallelism-limit"

Expand Down
41 changes: 41 additions & 0 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ type recentCompletions struct {
mutex gosync.RWMutex
}

type lastSeenVersions struct {
versions map[string]string
mutex gosync.RWMutex
}

// WorkflowController is the controller for workflow resources
type WorkflowController struct {
// namespace of the workflow controller
Expand Down Expand Up @@ -153,6 +158,8 @@ type WorkflowController struct {
recentCompletions recentCompletions
// lastUnreconciledWorkflows is a map of workflows that have been recently unreconciled
lastUnreconciledWorkflows map[string]*wfv1.Workflow

lastSeenVersions lastSeenVersions // key: workflow UID, value: resource version
}

const (
Expand Down Expand Up @@ -205,6 +212,10 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli
eventRecorderManager: events.NewEventRecorderManager(kubeclientset),
progressPatchTickDuration: env.LookupEnvDurationOr(ctx, common.EnvVarProgressPatchTickDuration, 1*time.Minute),
progressFileTickDuration: env.LookupEnvDurationOr(ctx, common.EnvVarProgressFileTickDuration, 3*time.Second),
lastSeenVersions: lastSeenVersions{
versions: make(map[string]string),
mutex: gosync.RWMutex{},
},
}

if executorPlugins {
Expand Down Expand Up @@ -724,6 +735,12 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
return true
}

if wfc.isOutdated(un) {
logger.WithField("key", key).Debug(ctx, "Skipping outdated workflow event")
wfc.wfQueue.AddRateLimited(key)
return true
}

if !reconciliationNeeded(un) {
logger.WithField("key", key).Debug(ctx, "Won't process Workflow since it's completed")
return true
Expand Down Expand Up @@ -946,6 +963,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context)
if !needed {
key, _ := cache.MetaNamespaceKeyFunc(un)
wfc.recordCompletedWorkflow(key)
wfc.deleteLastSeenVersionKey(wfc.getLastSeenVersionKey(un))
}
return needed
},
Expand Down Expand Up @@ -1003,6 +1021,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context)
// no need to add to the queue - this workflow is done
wfc.throttler.Remove(key)
}
wfc.deleteLastSeenVersionKey(wfc.getLastSeenVersionKey(obj.(*unstructured.Unstructured)))
},
},
},
Expand Down Expand Up @@ -1346,3 +1365,25 @@ func (wfc *WorkflowController) IsLeader() bool {
// the wfc.wfInformer is nil if it is not the leader
return wfc.wfInformer != nil
}

func (wfc *WorkflowController) isOutdated(wf metav1.Object) bool {
wfc.lastSeenVersions.mutex.RLock()
defer wfc.lastSeenVersions.mutex.RUnlock()
lastSeenRV, ok := wfc.lastSeenVersions.versions[wfc.getLastSeenVersionKey(wf)]
// always process if not seen before
if !ok || lastSeenRV == "" {
return false
}
annotations := wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion]
return annotations != lastSeenRV
}

func (wfc *WorkflowController) getLastSeenVersionKey(wf metav1.Object) string {
return string(wf.GetUID())
}

func (wfc *WorkflowController) deleteLastSeenVersionKey(key string) {
wfc.lastSeenVersions.mutex.Lock()
defer wfc.lastSeenVersions.mutex.Unlock()
delete(wfc.lastSeenVersions.versions, key)
}
23 changes: 23 additions & 0 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,8 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) {
woc.log.WithError(err).Warn(ctx, "error updating taskset")
}

oldRV := woc.wf.ResourceVersion
woc.updateLastSeenVersionAnnotation(oldRV)
wf, err := wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{})
if err != nil {
woc.log.WithField("error", err).WithField("reason", apierr.ReasonForError(err)).Warn(ctx, "Error updating workflow")
Expand All @@ -784,6 +786,7 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) {
woc.controller.hydrator.HydrateWithNodes(woc.wf, nodes)
}

woc.updateLastSeenVersion(oldRV)
// The workflow returned from wfClient.Update doesn't have a TypeMeta associated
// with it, so copy from the original workflow.
woc.wf.TypeMeta = woc.orig.TypeMeta
Expand Down Expand Up @@ -860,9 +863,13 @@ func (woc *wfOperationCtx) writeBackToInformer() error {
func (woc *wfOperationCtx) persistWorkflowSizeLimitErr(ctx context.Context, wfClient v1alpha1.WorkflowInterface, err error) {
woc.wf = woc.orig.DeepCopy()
woc.markWorkflowError(ctx, err)
oldRV := woc.wf.ResourceVersion
woc.updateLastSeenVersionAnnotation(oldRV)
_, err = wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{})
if err != nil {
woc.log.WithError(err).Warn(ctx, "Error updating workflow with size error")
} else {
woc.updateLastSeenVersion(oldRV)
}
}

Expand Down Expand Up @@ -4394,3 +4401,19 @@ func (woc *wfOperationCtx) setNodeDisplayName(ctx context.Context, node *wfv1.No
newNode.DisplayName = displayName
woc.wf.Status.Nodes.Set(ctx, nodeID, *newNode)
}

func (woc *wfOperationCtx) updateLastSeenVersionAnnotation(value string) {
if woc.wf.GetAnnotations() == nil {
woc.wf.SetAnnotations(make(map[string]string))
}
woc.wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion] = value
}

func (woc *wfOperationCtx) updateLastSeenVersion(value string) {
woc.controller.lastSeenVersions.mutex.Lock()
defer woc.controller.lastSeenVersions.mutex.Unlock()
if woc.controller.lastSeenVersions.versions == nil {
woc.controller.lastSeenVersions.versions = make(map[string]string)
}
woc.controller.lastSeenVersions.versions[woc.controller.getLastSeenVersionKey(woc.wf)] = value
}
Loading