diff --git a/docs/environment-variables.md b/docs/environment-variables.md index cd7d469e0df9..782cc27d675f 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -57,6 +57,7 @@ This document outlines environment variables that can be used to customize behav | `SEMAPHORE_NOTIFY_DELAY` | `time.Duration` | `1s` | Tuning Delay when notifying semaphore waiters about availability in the semaphore | | `WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS` | `bool` | `true` | Whether to watch the Controller's ConfigMap and semaphore ConfigMaps for run-time changes. When disabled, the Controller will only read these ConfigMaps once and will have to be manually restarted to pick up new changes. | | `SKIP_WORKFLOW_DURATION_ESTIMATION` | `bool` | `false` | Whether to lookup resource usage from prior workflows to estimate usage for new workflows. | +| `PROCESSED_WORKFLOW_VERSIONS_TTL` | `time.Duration` | `10s` | How long the workflow versions cache stores. Should be set as the max workflow informer delay anticipated to avoid processing the same workflow version multiple times. | CLI parameters of the Controller can be specified as environment variables with the `ARGO_` prefix. For example: diff --git a/go.mod b/go.mod index ecc8393d5a8a..a84b61df2ed5 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible github.com/Masterminds/sprig/v3 v3.3.0 github.com/TwiN/go-color v1.4.1 + github.com/TwiN/gocache/v2 v2.2.2 github.com/alibabacloud-go/tea v1.3.9 github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible github.com/aliyun/credentials-go v1.4.6 @@ -46,6 +47,7 @@ require ( github.com/prometheus/common v0.64.0 github.com/robfig/cron/v3 v3.0.1 github.com/sethvargo/go-limiter v1.0.0 + github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.9.1 github.com/spf13/pflag v1.0.6 github.com/spf13/viper v1.20.1 @@ -92,7 +94,6 @@ require ( github.com/olekukonko/errors v1.1.0 // indirect github.com/olekukonko/ll v0.1.0 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/sirupsen/logrus v1.9.3 // indirect ) require ( diff --git a/go.sum b/go.sum index d47013fccc9a..6e47fa58edd2 100644 --- a/go.sum +++ b/go.sum @@ -103,6 +103,8 @@ github.com/ProtonMail/go-crypto v1.3.0 h1:ILq8+Sf5If5DCpHQp4PbZdS1J7HDFRXz/+xKBi github.com/ProtonMail/go-crypto v1.3.0/go.mod h1:9whxjD8Rbs29b4XWbB8irEcE8KHMqaR2e7GWU1R+/PE= github.com/TwiN/go-color v1.4.1 h1:mqG0P/KBgHKVqmtL5ye7K0/Gr4l6hTksPgTgMk3mUzc= github.com/TwiN/go-color v1.4.1/go.mod h1:WcPf/jtiW95WBIsEeY1Lc/b8aaWoiqQpu5cf8WFxu+s= +github.com/TwiN/gocache/v2 v2.2.2 h1:4HToPfDV8FSbaYO5kkbhLpEllUYse5rAf+hVU/mSsuI= +github.com/TwiN/gocache/v2 v2.2.2/go.mod h1:WfIuwd7GR82/7EfQqEtmLFC3a2vqaKbs4Pe6neB7Gyc= github.com/TylerBrock/colorjson v0.0.0-20200706003622-8a50f05110d2 h1:ZBbLwSJqkHBuFDA6DUhhse0IGJ7T5bemHyNILUjvOq4= github.com/TylerBrock/colorjson v0.0.0-20200706003622-8a50f05110d2/go.mod h1:VSw57q4QFiWDbRnjdX8Cb3Ow0SFncRw+bA/ofY6Q83w= github.com/UnnoTed/fileb0x v1.1.4/go.mod h1:X59xXT18tdNk/D6j+KZySratBsuKJauMtVuJ9cgOiZs= diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 4598ac37038c..908c1fe530e9 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -153,6 +153,7 @@ type WorkflowController struct { recentCompletions recentCompletions // lastUnreconciledWorkflows is a map of workflows that have been recently unreconciled lastUnreconciledWorkflows map[string]*wfv1.Workflow + workflowVersionChecker *workflowVersionChecker } const ( @@ -175,6 +176,10 @@ var ( // believe it cannot run. By delaying for 1s, we would have finished the semaphore counter // updates, and the next workflow will see the updated availability. semaphoreNotifyDelay = env.LookupEnvDurationOr(logging.InitLoggerInContext(), "SEMAPHORE_NOTIFY_DELAY", time.Second) + + // processedWorfklowVersionsTTL is the ttl of the processed workflow versions cache. The controller + // will make sure versions in the cache won't be processed multiple times even with informer delay. + processedWorfklowVersionsTTL = env.LookupEnvDurationOr(logging.InitLoggerInContext(), "PROCESSED_WORKFLOW_VERSIONS_TTL", 10*time.Second) ) func init() { @@ -305,6 +310,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo "workflowArchive": wfArchiveWorkers, }).Info(ctx, "Current Worker Numbers") + wfc.workflowVersionChecker = NewWorkflowVersionChecker(processedWorfklowVersionsTTL) wfc.wfInformer = util.NewWorkflowInformer(ctx, wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers) nsInformer, err := wfc.newNamespaceInformer(ctx, wfc.kubeclientset) if err != nil { @@ -725,6 +731,11 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { return true } + if wfc.workflowVersionChecker.IsOutdated(un) { + logger.WithFields(logging.Fields{"key": key, "version": un.GetResourceVersion()}).Info(ctx, "Won't process Workflow since current status is outdated") + return true + } + if !reconciliationNeeded(un) { logger.WithField("key", key).Debug(ctx, "Won't process Workflow since it's completed") return true diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index b061a293ecb0..fa70512d9fcf 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -300,6 +300,7 @@ func newController(ctx context.Context, options ...interface{}) (context.CancelF progressPatchTickDuration: envutil.LookupEnvDurationOr(ctx, common.EnvVarProgressPatchTickDuration, 1*time.Minute), progressFileTickDuration: envutil.LookupEnvDurationOr(ctx, common.EnvVarProgressFileTickDuration, 3*time.Second), maxStackDepth: maxAllowedStackDepth, + workflowVersionChecker: NewWorkflowVersionChecker(NoExpiration), } for _, opt := range options { diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 2f4fac1df744..143ec5f61654 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -794,6 +794,7 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { panic("workflow should be hydrated") } + woc.updateOutdatedVersionIfChanged(ctx, woc.orig, woc.wf) woc.log.WithFields(logging.Fields{"resourceVersion": woc.wf.ResourceVersion, "phase": woc.wf.Status.Phase}).Info(ctx, "Workflow update successful") switch os.Getenv("INFORMER_WRITE_BACK") { @@ -826,6 +827,14 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { woc.queuePodsForCleanup(ctx) } +func (woc *wfOperationCtx) updateOutdatedVersionIfChanged(ctx context.Context, origWf metav1.Object, newWf metav1.Object) { + if origWf.GetResourceVersion() != newWf.GetResourceVersion() { + woc.controller.workflowVersionChecker.UpdateOutdatedVersion(origWf) + } else { + woc.log.WithFields(logging.Fields{"workflow": origWf.GetName(), "version": origWf.GetResourceVersion()}).Info(ctx, "Version not changed. Will not update outdated version") + } +} + func (woc *wfOperationCtx) checkTaskResultsInProgress(ctx context.Context) bool { woc.log.WithField("status", woc.wf.Status.TaskResultsCompletionStatus).Debug(ctx, "Task results completion status") return woc.wf.Status.TaskResultsInProgress() @@ -935,6 +944,7 @@ func (woc *wfOperationCtx) reapplyUpdate(ctx context.Context, wfClient v1alpha1. if err == nil { woc.log.WithField("attempt", attempt).Info(ctx, "Update retry attempt successful") woc.controller.hydrator.HydrateWithNodes(wf, nodes) + woc.updateOutdatedVersionIfChanged(ctx, currWf, wf) return wf, nil } attempt++ diff --git a/workflow/controller/workflow_version_checker.go b/workflow/controller/workflow_version_checker.go new file mode 100644 index 000000000000..d04baef55de4 --- /dev/null +++ b/workflow/controller/workflow_version_checker.go @@ -0,0 +1,44 @@ +package controller + +import ( + "fmt" + "time" + + "github.com/TwiN/gocache/v2" + log "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" +) + +// workflowVersionChecker is a cache used to check if a workflow status is outdated. +// It stores outdated versions of workflows, use the namespace/workflow:version as the key, value is nil. +// Set the ttl as the max delay of the informer you anticipate. +type workflowVersionChecker struct { + cache *gocache.Cache +} + +const NoExpiration = gocache.NoExpiration + +func NewWorkflowVersionChecker(ttl time.Duration) *workflowVersionChecker { + log.WithFields(log.Fields{"ttl": ttl}).Info("Starting workflow version checker") + cache := gocache.NewCache().WithDefaultTTL(ttl).WithMaxSize(gocache.NoMaxSize).WithMaxMemoryUsage(gocache.NoMaxMemoryUsage).WithEvictionPolicy(gocache.LeastRecentlyUsed) + if err := cache.StartJanitor(); err != nil { + log.WithError(err).Warn("Failed to start cache janitor, TTL functionality will be disabled") + } + return &workflowVersionChecker{cache: cache} +} + +func (c *workflowVersionChecker) IsOutdated(wf metav1.Object) bool { + _, exist := c.cache.Get(toProcessedVersionKey(wf, wf.GetResourceVersion())) + return exist +} + +func (c *workflowVersionChecker) UpdateOutdatedVersion(wf metav1.Object) { + c.cache.Set(toProcessedVersionKey(wf, wf.GetResourceVersion()), nil) + log.WithFields(log.Fields{"workflow": wf.GetName(), "version": wf.GetResourceVersion()}).Info("Workflow version checker: logged outdated version") +} + +func toProcessedVersionKey(wf metav1.Object, version string) string { + key, _ := cache.MetaNamespaceKeyFunc(wf) + return fmt.Sprintf("%s:%s", key, version) +} diff --git a/workflow/controller/workflow_version_checker_test.go b/workflow/controller/workflow_version_checker_test.go new file mode 100644 index 000000000000..ec352ddbcd57 --- /dev/null +++ b/workflow/controller/workflow_version_checker_test.go @@ -0,0 +1,120 @@ +package controller + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type testWorkflow struct { + metav1.ObjectMeta +} + +func (t *testWorkflow) GetResourceVersion() string { + return t.ResourceVersion +} + +func TestWorkflowVersionChecker(t *testing.T) { + checker := NewWorkflowVersionChecker(NoExpiration) + + // Test outdated version tracking + wf1 := &testWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-workflow", + Namespace: "default", + ResourceVersion: "1", + }, + } + + // Mark version 1 as outdated + checker.UpdateOutdatedVersion(wf1) + + // Verify version 1 is marked as outdated + assert.True(t, checker.IsOutdated(wf1), "Version 1 should be marked as outdated") + + // Test different workflow + wf2 := &testWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-workflow", + Namespace: "default", + ResourceVersion: "2", + }, + } + assert.False(t, checker.IsOutdated(wf2), "Version 2 should not be marked as outdated") +} + +func TestWorkflowVersionCheckerWithDifferentNamespaces(t *testing.T) { + checker := NewWorkflowVersionChecker(10 * time.Minute) + + // Create workflows in different namespaces + wf1 := &testWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-workflow", + Namespace: "namespace1", + ResourceVersion: "1", + }, + } + wf2 := &testWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-workflow", + Namespace: "namespace2", + ResourceVersion: "1", + }, + } + + // Mark version 1 in namespace1 as outdated + checker.UpdateOutdatedVersion(wf1) + + // Check that they don't interfere with each other + assert.True(t, checker.IsOutdated(wf1), "Workflow in namespace1 should be marked as outdated") + assert.False(t, checker.IsOutdated(wf2), "Workflow in namespace2 should not be marked as outdated") +} + +func TestWorkflowVersionCheckerWithDifferentNames(t *testing.T) { + checker := NewWorkflowVersionChecker(10 * time.Minute) + + // Create workflows with different names + wf1 := &testWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "workflow1", + Namespace: "default", + ResourceVersion: "1", + }, + } + wf2 := &testWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "workflow2", + Namespace: "default", + ResourceVersion: "1", + }, + } + + // Mark workflow1 version 1 as outdated + checker.UpdateOutdatedVersion(wf1) + + // Check that they don't interfere with each other + assert.True(t, checker.IsOutdated(wf1), "workflow1 should be marked as outdated") + assert.False(t, checker.IsOutdated(wf2), "workflow2 should not be marked as outdated") +} + +func TestWorkflowVersionCheckerTTL(t *testing.T) { + checker := NewWorkflowVersionChecker(time.Millisecond) + + // Create and mark a workflow version as outdated + wf := &testWorkflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-workflow", + Namespace: "default", + ResourceVersion: "1", + }, + } + checker.UpdateOutdatedVersion(wf) + + // Wait for TTL to expire + time.Sleep(time.Millisecond * 2) + + // Verify it's no longer marked as outdated + assert.False(t, checker.IsOutdated(wf), "Version should no longer be marked as outdated after TTL") +}