Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ const (
// the strategy whose artifacts are being deleted
AnnotationKeyArtifactGCStrategy = workflow.WorkflowFullName + "/artifact-gc-strategy"

// AnnotationKeyLastSeenVersion is the last seen version for the workflow
AnnotationKeyLastSeenVersion = workflow.WorkflowFullName + "/last-seen-version"

// LabelParallelismLimit is a label applied on namespace objects to control the per namespace parallelism.
LabelParallelismLimit = workflow.WorkflowFullName + "/parallelism-limit"

Expand Down
41 changes: 41 additions & 0 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ type recentCompletions struct {
mutex gosync.RWMutex
}

type lastSeenVersions struct {
versions map[string]string
mutex gosync.RWMutex
}

// WorkflowController is the controller for workflow resources
type WorkflowController struct {
// namespace of the workflow controller
Expand Down Expand Up @@ -153,6 +158,8 @@ type WorkflowController struct {
recentCompletions recentCompletions
// lastUnreconciledWorkflows is a map of workflows that have been recently unreconciled
lastUnreconciledWorkflows map[string]*wfv1.Workflow

lastSeenVersions lastSeenVersions // key: workflow UID, value: resource version
}

const (
Expand Down Expand Up @@ -205,6 +212,10 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli
eventRecorderManager: events.NewEventRecorderManager(kubeclientset),
progressPatchTickDuration: env.LookupEnvDurationOr(ctx, common.EnvVarProgressPatchTickDuration, 1*time.Minute),
progressFileTickDuration: env.LookupEnvDurationOr(ctx, common.EnvVarProgressFileTickDuration, 3*time.Second),
lastSeenVersions: lastSeenVersions{
versions: make(map[string]string),
mutex: gosync.RWMutex{},
},
}

if executorPlugins {
Expand Down Expand Up @@ -724,6 +735,12 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
return true
}

if wfc.isOutdated(un) {
logger.WithField("key", key).Debug(ctx, "Skipping outdated workflow event")
wfc.wfQueue.AddRateLimited(key)
return true
}

if !reconciliationNeeded(un) {
logger.WithField("key", key).Debug(ctx, "Won't process Workflow since it's completed")
return true
Expand Down Expand Up @@ -946,6 +963,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context)
if !needed {
key, _ := cache.MetaNamespaceKeyFunc(un)
wfc.recordCompletedWorkflow(key)
wfc.deleteLastSeenVersionKey(wfc.getLastSeenVersionKey(un))
}
return needed
},
Expand Down Expand Up @@ -1003,6 +1021,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context)
// no need to add to the queue - this workflow is done
wfc.throttler.Remove(key)
}
wfc.deleteLastSeenVersionKey(wfc.getLastSeenVersionKey(obj.(*unstructured.Unstructured)))
},
},
},
Expand Down Expand Up @@ -1346,3 +1365,25 @@ func (wfc *WorkflowController) IsLeader() bool {
// the wfc.wfInformer is nil if it is not the leader
return wfc.wfInformer != nil
}

func (wfc *WorkflowController) isOutdated(wf metav1.Object) bool {
wfc.lastSeenVersions.mutex.RLock()
defer wfc.lastSeenVersions.mutex.RUnlock()
lastSeenRV, ok := wfc.lastSeenVersions.versions[wfc.getLastSeenVersionKey(wf)]
// always process if not seen before
if !ok || lastSeenRV == "" {
return false
}
annotations := wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion]
return annotations != lastSeenRV
}

func (wfc *WorkflowController) getLastSeenVersionKey(wf metav1.Object) string {
return string(wf.GetUID())
}

func (wfc *WorkflowController) deleteLastSeenVersionKey(key string) {
wfc.lastSeenVersions.mutex.Lock()
defer wfc.lastSeenVersions.mutex.Unlock()
delete(wfc.lastSeenVersions.versions, key)
}
22 changes: 22 additions & 0 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,8 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) {
woc.log.WithError(err).Warn(ctx, "error updating taskset")
}

oldRV := woc.wf.ResourceVersion
woc.updateLastSeenVersionAnnotation(oldRV)
wf, err := wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{})
if err != nil {
woc.log.WithField("error", err).WithField("reason", apierr.ReasonForError(err)).Warn(ctx, "Error updating workflow")
Expand All @@ -784,6 +786,7 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) {
woc.controller.hydrator.HydrateWithNodes(woc.wf, nodes)
}

woc.updateLastSeenVersion(oldRV)
// The workflow returned from wfClient.Update doesn't have a TypeMeta associated
// with it, so copy from the original workflow.
woc.wf.TypeMeta = woc.orig.TypeMeta
Expand Down Expand Up @@ -859,9 +862,12 @@ func (woc *wfOperationCtx) writeBackToInformer() error {
func (woc *wfOperationCtx) persistWorkflowSizeLimitErr(ctx context.Context, wfClient v1alpha1.WorkflowInterface, err error) {
woc.wf = woc.orig.DeepCopy()
woc.markWorkflowError(ctx, err)
oldRV := woc.wf.ResourceVersion
_, err = wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{})
if err != nil {
woc.log.WithError(err).Warn(ctx, "Error updating workflow with size error")
} else {
woc.updateLastSeenVersion(oldRV)
}
}

Expand Down Expand Up @@ -4393,3 +4399,19 @@ func (woc *wfOperationCtx) setNodeDisplayName(ctx context.Context, node *wfv1.No
newNode.DisplayName = displayName
woc.wf.Status.Nodes.Set(ctx, nodeID, *newNode)
}

func (woc *wfOperationCtx) updateLastSeenVersionAnnotation(value string) {
if woc.wf.GetAnnotations() == nil {
woc.wf.SetAnnotations(make(map[string]string))
}
woc.wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion] = value
}

func (woc *wfOperationCtx) updateLastSeenVersion(value string) {
woc.controller.lastSeenVersions.mutex.Lock()
defer woc.controller.lastSeenVersions.mutex.Unlock()
if woc.controller.lastSeenVersions.versions == nil {
woc.controller.lastSeenVersions.versions = make(map[string]string)
}
woc.controller.lastSeenVersions.versions[woc.controller.getLastSeenVersionKey(woc.wf)] = value
}
Loading