Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ This document outlines environment variables that can be used to customize behav
| `SEMAPHORE_NOTIFY_DELAY` | `time.Duration` | `1s` | Tuning Delay when notifying semaphore waiters about availability in the semaphore |
| `WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS` | `bool` | `true` | Whether to watch the Controller's ConfigMap and semaphore ConfigMaps for run-time changes. When disabled, the Controller will only read these ConfigMaps once and will have to be manually restarted to pick up new changes. |
| `SKIP_WORKFLOW_DURATION_ESTIMATION` | `bool` | `false` | Whether to lookup resource usage from prior workflows to estimate usage for new workflows. |
| `PROCESSED_WORKFLOW_VERSIONS_TTL` | `time.Duration` | `10s` | How long the workflow versions cache stores. Should be set as the max workflow informer delay anticipated to avoid processing the same workflow version multiple times. |

CLI parameters of the Controller can be specified as environment variables with the `ARGO_` prefix.
For example:
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible
github.com/Masterminds/sprig/v3 v3.3.0
github.com/TwiN/go-color v1.4.1
github.com/TwiN/gocache/v2 v2.2.2
github.com/alibabacloud-go/tea v1.3.9
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
github.com/aliyun/credentials-go v1.4.6
Expand Down Expand Up @@ -46,6 +47,7 @@ require (
github.com/prometheus/common v0.64.0
github.com/robfig/cron/v3 v3.0.1
github.com/sethvargo/go-limiter v1.0.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.9.1
github.com/spf13/pflag v1.0.6
github.com/spf13/viper v1.20.1
Expand Down Expand Up @@ -92,7 +94,6 @@ require (
github.com/olekukonko/errors v1.1.0 // indirect
github.com/olekukonko/ll v0.1.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ github.com/ProtonMail/go-crypto v1.3.0 h1:ILq8+Sf5If5DCpHQp4PbZdS1J7HDFRXz/+xKBi
github.com/ProtonMail/go-crypto v1.3.0/go.mod h1:9whxjD8Rbs29b4XWbB8irEcE8KHMqaR2e7GWU1R+/PE=
github.com/TwiN/go-color v1.4.1 h1:mqG0P/KBgHKVqmtL5ye7K0/Gr4l6hTksPgTgMk3mUzc=
github.com/TwiN/go-color v1.4.1/go.mod h1:WcPf/jtiW95WBIsEeY1Lc/b8aaWoiqQpu5cf8WFxu+s=
github.com/TwiN/gocache/v2 v2.2.2 h1:4HToPfDV8FSbaYO5kkbhLpEllUYse5rAf+hVU/mSsuI=
github.com/TwiN/gocache/v2 v2.2.2/go.mod h1:WfIuwd7GR82/7EfQqEtmLFC3a2vqaKbs4Pe6neB7Gyc=
github.com/TylerBrock/colorjson v0.0.0-20200706003622-8a50f05110d2 h1:ZBbLwSJqkHBuFDA6DUhhse0IGJ7T5bemHyNILUjvOq4=
github.com/TylerBrock/colorjson v0.0.0-20200706003622-8a50f05110d2/go.mod h1:VSw57q4QFiWDbRnjdX8Cb3Ow0SFncRw+bA/ofY6Q83w=
github.com/UnnoTed/fileb0x v1.1.4/go.mod h1:X59xXT18tdNk/D6j+KZySratBsuKJauMtVuJ9cgOiZs=
Expand Down
11 changes: 11 additions & 0 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type WorkflowController struct {
recentCompletions recentCompletions
// lastUnreconciledWorkflows is a map of workflows that have been recently unreconciled
lastUnreconciledWorkflows map[string]*wfv1.Workflow
workflowVersionChecker *workflowVersionChecker
}

const (
Expand All @@ -175,6 +176,10 @@ var (
// believe it cannot run. By delaying for 1s, we would have finished the semaphore counter
// updates, and the next workflow will see the updated availability.
semaphoreNotifyDelay = env.LookupEnvDurationOr(logging.InitLoggerInContext(), "SEMAPHORE_NOTIFY_DELAY", time.Second)

// processedWorfklowVersionsTTL is the ttl of the processed workflow versions cache. The controller
// will make sure versions in the cache won't be processed multiple times even with informer delay.
processedWorfklowVersionsTTL = env.LookupEnvDurationOr(logging.InitLoggerInContext(), "PROCESSED_WORKFLOW_VERSIONS_TTL", 10*time.Second)
)

func init() {
Expand Down Expand Up @@ -305,6 +310,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
"workflowArchive": wfArchiveWorkers,
}).Info(ctx, "Current Worker Numbers")

wfc.workflowVersionChecker = NewWorkflowVersionChecker(processedWorfklowVersionsTTL)
wfc.wfInformer = util.NewWorkflowInformer(ctx, wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers)
nsInformer, err := wfc.newNamespaceInformer(ctx, wfc.kubeclientset)
if err != nil {
Expand Down Expand Up @@ -725,6 +731,11 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
return true
}

if wfc.workflowVersionChecker.IsOutdated(un) {
logger.WithFields(logging.Fields{"key": key, "version": un.GetResourceVersion()}).Info(ctx, "Won't process Workflow since current status is outdated")
return true
}

if !reconciliationNeeded(un) {
logger.WithField("key", key).Debug(ctx, "Won't process Workflow since it's completed")
return true
Expand Down
1 change: 1 addition & 0 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ func newController(ctx context.Context, options ...interface{}) (context.CancelF
progressPatchTickDuration: envutil.LookupEnvDurationOr(ctx, common.EnvVarProgressPatchTickDuration, 1*time.Minute),
progressFileTickDuration: envutil.LookupEnvDurationOr(ctx, common.EnvVarProgressFileTickDuration, 3*time.Second),
maxStackDepth: maxAllowedStackDepth,
workflowVersionChecker: NewWorkflowVersionChecker(NoExpiration),
}

for _, opt := range options {
Expand Down
10 changes: 10 additions & 0 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,7 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) {
panic("workflow should be hydrated")
}

woc.updateOutdatedVersionIfChanged(ctx, woc.orig, woc.wf)
woc.log.WithFields(logging.Fields{"resourceVersion": woc.wf.ResourceVersion, "phase": woc.wf.Status.Phase}).Info(ctx, "Workflow update successful")

switch os.Getenv("INFORMER_WRITE_BACK") {
Expand Down Expand Up @@ -826,6 +827,14 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) {
woc.queuePodsForCleanup(ctx)
}

func (woc *wfOperationCtx) updateOutdatedVersionIfChanged(ctx context.Context, origWf metav1.Object, newWf metav1.Object) {
if origWf.GetResourceVersion() != newWf.GetResourceVersion() {
woc.controller.workflowVersionChecker.UpdateOutdatedVersion(origWf)
} else {
woc.log.WithFields(logging.Fields{"workflow": origWf.GetName(), "version": origWf.GetResourceVersion()}).Info(ctx, "Version not changed. Will not update outdated version")
}
}

func (woc *wfOperationCtx) checkTaskResultsInProgress(ctx context.Context) bool {
woc.log.WithField("status", woc.wf.Status.TaskResultsCompletionStatus).Debug(ctx, "Task results completion status")
return woc.wf.Status.TaskResultsInProgress()
Expand Down Expand Up @@ -935,6 +944,7 @@ func (woc *wfOperationCtx) reapplyUpdate(ctx context.Context, wfClient v1alpha1.
if err == nil {
woc.log.WithField("attempt", attempt).Info(ctx, "Update retry attempt successful")
woc.controller.hydrator.HydrateWithNodes(wf, nodes)
woc.updateOutdatedVersionIfChanged(ctx, currWf, wf)
return wf, nil
}
attempt++
Expand Down
44 changes: 44 additions & 0 deletions workflow/controller/workflow_version_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package controller

import (
"fmt"
"time"

"github.com/TwiN/gocache/v2"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
)

// workflowVersionChecker is a cache used to check if a workflow status is outdated.
// It stores outdated versions of workflows, use the namespace/workflow:version as the key, value is nil.
// Set the ttl as the max delay of the informer you anticipate.
type workflowVersionChecker struct {
cache *gocache.Cache
}

const NoExpiration = gocache.NoExpiration

func NewWorkflowVersionChecker(ttl time.Duration) *workflowVersionChecker {
log.WithFields(log.Fields{"ttl": ttl}).Info("Starting workflow version checker")
cache := gocache.NewCache().WithDefaultTTL(ttl).WithMaxSize(gocache.NoMaxSize).WithMaxMemoryUsage(gocache.NoMaxMemoryUsage).WithEvictionPolicy(gocache.LeastRecentlyUsed)
if err := cache.StartJanitor(); err != nil {
log.WithError(err).Warn("Failed to start cache janitor, TTL functionality will be disabled")
}
return &workflowVersionChecker{cache: cache}
}

func (c *workflowVersionChecker) IsOutdated(wf metav1.Object) bool {
_, exist := c.cache.Get(toProcessedVersionKey(wf, wf.GetResourceVersion()))
return exist
}

func (c *workflowVersionChecker) UpdateOutdatedVersion(wf metav1.Object) {
c.cache.Set(toProcessedVersionKey(wf, wf.GetResourceVersion()), nil)
log.WithFields(log.Fields{"workflow": wf.GetName(), "version": wf.GetResourceVersion()}).Info("Workflow version checker: logged outdated version")
}

func toProcessedVersionKey(wf metav1.Object, version string) string {
key, _ := cache.MetaNamespaceKeyFunc(wf)
return fmt.Sprintf("%s:%s", key, version)
}
120 changes: 120 additions & 0 deletions workflow/controller/workflow_version_checker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package controller

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type testWorkflow struct {
metav1.ObjectMeta
}

func (t *testWorkflow) GetResourceVersion() string {
return t.ResourceVersion
}

func TestWorkflowVersionChecker(t *testing.T) {
checker := NewWorkflowVersionChecker(NoExpiration)

// Test outdated version tracking
wf1 := &testWorkflow{
ObjectMeta: metav1.ObjectMeta{
Name: "test-workflow",
Namespace: "default",
ResourceVersion: "1",
},
}

// Mark version 1 as outdated
checker.UpdateOutdatedVersion(wf1)

// Verify version 1 is marked as outdated
assert.True(t, checker.IsOutdated(wf1), "Version 1 should be marked as outdated")

// Test different workflow
wf2 := &testWorkflow{
ObjectMeta: metav1.ObjectMeta{
Name: "test-workflow",
Namespace: "default",
ResourceVersion: "2",
},
}
assert.False(t, checker.IsOutdated(wf2), "Version 2 should not be marked as outdated")
}

func TestWorkflowVersionCheckerWithDifferentNamespaces(t *testing.T) {
checker := NewWorkflowVersionChecker(10 * time.Minute)

// Create workflows in different namespaces
wf1 := &testWorkflow{
ObjectMeta: metav1.ObjectMeta{
Name: "test-workflow",
Namespace: "namespace1",
ResourceVersion: "1",
},
}
wf2 := &testWorkflow{
ObjectMeta: metav1.ObjectMeta{
Name: "test-workflow",
Namespace: "namespace2",
ResourceVersion: "1",
},
}

// Mark version 1 in namespace1 as outdated
checker.UpdateOutdatedVersion(wf1)

// Check that they don't interfere with each other
assert.True(t, checker.IsOutdated(wf1), "Workflow in namespace1 should be marked as outdated")
assert.False(t, checker.IsOutdated(wf2), "Workflow in namespace2 should not be marked as outdated")
}

func TestWorkflowVersionCheckerWithDifferentNames(t *testing.T) {
checker := NewWorkflowVersionChecker(10 * time.Minute)

// Create workflows with different names
wf1 := &testWorkflow{
ObjectMeta: metav1.ObjectMeta{
Name: "workflow1",
Namespace: "default",
ResourceVersion: "1",
},
}
wf2 := &testWorkflow{
ObjectMeta: metav1.ObjectMeta{
Name: "workflow2",
Namespace: "default",
ResourceVersion: "1",
},
}

// Mark workflow1 version 1 as outdated
checker.UpdateOutdatedVersion(wf1)

// Check that they don't interfere with each other
assert.True(t, checker.IsOutdated(wf1), "workflow1 should be marked as outdated")
assert.False(t, checker.IsOutdated(wf2), "workflow2 should not be marked as outdated")
}

func TestWorkflowVersionCheckerTTL(t *testing.T) {
checker := NewWorkflowVersionChecker(time.Millisecond)

// Create and mark a workflow version as outdated
wf := &testWorkflow{
ObjectMeta: metav1.ObjectMeta{
Name: "test-workflow",
Namespace: "default",
ResourceVersion: "1",
},
}
checker.UpdateOutdatedVersion(wf)

// Wait for TTL to expire
time.Sleep(time.Millisecond * 2)

// Verify it's no longer marked as outdated
assert.False(t, checker.IsOutdated(wf), "Version should no longer be marked as outdated after TTL")
}
Loading