From 75f4708c9c12b97ad6a49cd85a4483074dd97793 Mon Sep 17 00:00:00 2001 From: shuangkun Date: Sun, 21 Sep 2025 18:56:03 +0800 Subject: [PATCH 1/6] feat: add throttle queue to speed up eventhandle processing. Co-authored-by: lons Co-authored-by: shuangkun Signed-off-by: shuangkun --- ...dd-throttlequeue-to-speedup-eventhandle.md | 16 ++++ cmd/workflow-controller/main.go | 6 +- workflow/controller/controller.go | 65 ++++++++++++-- workflow/controller/controller_test.go | 1 + workflow/sync/throttle_key.go | 47 ++++++++++ workflow/sync/throttle_key_test.go | 88 +++++++++++++++++++ 6 files changed, 213 insertions(+), 10 deletions(-) create mode 100644 .features/pending/add-throttlequeue-to-speedup-eventhandle.md create mode 100644 workflow/sync/throttle_key.go create mode 100644 workflow/sync/throttle_key_test.go diff --git a/.features/pending/add-throttlequeue-to-speedup-eventhandle.md b/.features/pending/add-throttlequeue-to-speedup-eventhandle.md new file mode 100644 index 000000000000..06c7379c33d5 --- /dev/null +++ b/.features/pending/add-throttlequeue-to-speedup-eventhandle.md @@ -0,0 +1,16 @@ + +Description: +Author: +Component: +Issues: + + diff --git a/cmd/workflow-controller/main.go b/cmd/workflow-controller/main.go index 03cd207c999c..6c482b240130 100644 --- a/cmd/workflow-controller/main.go +++ b/cmd/workflow-controller/main.go @@ -58,6 +58,7 @@ func NewRootCommand() *cobra.Command { podCleanupWorkers int // --pod-cleanup-workers cronWorkflowWorkers int // --cron-workflow-workers workflowArchiveWorkers int // --workflow-archive-workers + workflowThrottleWorkers int // --workflow-throttle-workers burst int qps float32 namespaced bool // --namespaced @@ -123,7 +124,7 @@ func NewRootCommand() *cobra.Command { log.Info(ctx, "Leader election is turned off. Running in single-instance mode") log.WithField("id", "single-instance").Info(ctx, "starting leading") - go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, workflowArchiveWorkers) + go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, workflowArchiveWorkers, workflowThrottleWorkers) go wfController.RunPrometheusServer(ctx, false) } else { nodeID, ok := os.LookupEnv("LEADER_ELECTION_IDENTITY") @@ -160,7 +161,7 @@ func NewRootCommand() *cobra.Command { OnStartedLeading: func(ctx context.Context) { dummyCancel() wg.Wait() - go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, workflowArchiveWorkers) + go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, workflowArchiveWorkers, workflowThrottleWorkers) wg.Add(1) go func() { wfController.RunPrometheusServer(ctx, false) @@ -203,6 +204,7 @@ func NewRootCommand() *cobra.Command { command.Flags().IntVar(&podCleanupWorkers, "pod-cleanup-workers", 4, "Number of pod cleanup workers") command.Flags().IntVar(&cronWorkflowWorkers, "cron-workflow-workers", 8, "Number of cron workflow workers") command.Flags().IntVar(&workflowArchiveWorkers, "workflow-archive-workers", 8, "Number of workflow archive workers") + command.Flags().IntVar(&workflowThrottleWorkers, "workflow-throttle-workers", 8, "Number of workflow throttle workers") command.Flags().IntVar(&burst, "burst", 30, "Maximum burst for throttle.") command.Flags().Float32Var(&qps, "qps", 20.0, "Queries per second") command.Flags().BoolVar(&namespaced, "namespaced", false, "run workflow-controller as namespaced mode") diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 4598ac37038c..cd8d4641bdb6 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -126,6 +126,7 @@ type WorkflowController struct { configMapInformer cache.SharedIndexInformer wfQueue workqueue.TypedRateLimitingInterface[string] wfArchiveQueue workqueue.TypedRateLimitingInterface[string] + wfThrottleQueue workqueue.TypedRateLimitingInterface[string] throttler sync.Throttler workflowKeyLock syncpkg.KeyLock // used to lock workflows for exclusive modification or access session db.Session @@ -234,13 +235,16 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli workqueue.SetProvider(wfc.metrics) // must execute SetProvider before we create the queues wfc.wfQueue = wfc.metrics.RateLimiterWithBusyWorkers(ctx, &fixedItemIntervalRateLimiter{}, "workflow_queue") wfc.throttler = wfc.newThrottler() + wfc.wfThrottleQueue = wfc.metrics.RateLimiterWithBusyWorkers(ctx, workqueue.DefaultTypedControllerRateLimiter[string](), "workflow_throttle_queue") wfc.wfArchiveQueue = wfc.metrics.RateLimiterWithBusyWorkers(ctx, workqueue.DefaultTypedControllerRateLimiter[string](), "workflow_archive_queue") return &wfc, nil } func (wfc *WorkflowController) newThrottler() sync.Throttler { - f := func(key string) { wfc.wfQueue.Add(key) } + f := func(key string) { + wfc.wfThrottleQueue.Add(sync.NewThrottleKey(key, 0, time.Now(), sync.ThrottleActionAdd)) + } return sync.NewMultiThrottler(wfc.Config.Parallelism, wfc.Config.NamespaceParallelism, f) } @@ -279,7 +283,7 @@ var indexers = cache.Indexers{ } // Run starts a Workflow resource controller -func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, wfArchiveWorkers int) { +func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, wfArchiveWorkers, wfThrottleWorkers int) { defer runtimeutil.HandleCrashWithContext(ctx, runtimeutil.PanicHandlers...) logger := logging.RequireLoggerFromContext(ctx) @@ -303,6 +307,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo "podCleanup": podCleanupWorkers, "cronWorkflowWorkers": cronWorkflowWorkers, "workflowArchive": wfArchiveWorkers, + "workflowThrottle": wfThrottleWorkers, }).Info(ctx, "Current Worker Numbers") wfc.wfInformer = util.NewWorkflowInformer(ctx, wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers) @@ -389,6 +394,12 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo for i := 0; i < wfArchiveWorkers; i++ { go wait.UntilWithContext(archiveCtx, wfc.runArchiveWorker, time.Second) } + + throttleCtx, _ := logger.WithField("component", "throttle_worker").InContext(ctx) + for i := 0; i < wfThrottleWorkers; i++ { + go wait.UntilWithContext(throttleCtx, wfc.runThrottleWorker, time.Second) + } + if cacheGCPeriod != 0 { go wait.JitterUntilWithContext(ctx, wfc.syncAllCacheForGC, cacheGCPeriod, 0.0, true) } @@ -700,6 +711,13 @@ func (wfc *WorkflowController) runArchiveWorker(ctx context.Context) { } } +func (wfc *WorkflowController) runThrottleWorker(ctx context.Context) { + defer runtimeutil.HandleCrashWithContext(ctx, runtimeutil.PanicHandlers...) + + for wfc.processNextThrottleItem(ctx) { + } +} + // processNextItem is the worker logic for handling workflow updates func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { key, quit := wfc.wfQueue.Get() @@ -808,6 +826,40 @@ func (wfc *WorkflowController) processNextArchiveItem(ctx context.Context) bool return true } +func (wfc *WorkflowController) processNextThrottleItem(ctx context.Context) bool { + throttleKey, quit := wfc.wfThrottleQueue.Get() + if quit { + return false + } + defer wfc.wfThrottleQueue.Done(throttleKey) + + logger := logging.RequireLoggerFromContext(ctx) + key, priority, creation, action := sync.ParseThrottleKey(throttleKey) + if key == "" { + logger.WithField("throttleKey", throttleKey).Warn(ctx, "Failed to parse throttle key") + return true + } + + logger.WithField("key", key). + WithField("action", action). + WithField("priority", priority). + WithField("creation", creation). + Debug(ctx, "Processing throttle item") + + switch action { + case sync.ThrottleActionAdd, sync.ThrottleActionUpdate: + wfc.throttler.Add(key, priority, creation) + case sync.ThrottleActionDelete: + wfc.throttler.Remove(key) + default: + logger.WithField("action", action).Warn(ctx, "Unknown throttle action") + } + + // Add the workflow to the main queue for processing + wfc.wfQueue.Add(key) + return true +} + func (wfc *WorkflowController) getWorkflowByKey(ctx context.Context, key string) (interface{}, bool) { logger := logging.RequireLoggerFromContext(ctx) obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key) @@ -953,9 +1005,8 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { // for a new workflow, we do not want to rate limit its execution using AddRateLimited - wfc.wfQueue.AddAfter(key, wfc.Config.InitialDelay.Duration) priority, creation := getWfPriority(obj) - wfc.throttler.Add(key, priority, creation) + wfc.wfThrottleQueue.Add(sync.NewThrottleKey(key, priority, creation, sync.ThrottleActionAdd)) } }, // This function is called when an updated (we already know about this object) @@ -968,9 +1019,8 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) } key, err := cache.MetaNamespaceKeyFunc(new) if err == nil { - wfc.wfQueue.AddRateLimited(key) priority, creation := getWfPriority(new) - wfc.throttler.Add(key, priority, creation) + wfc.wfThrottleQueue.Add(sync.NewThrottleKey(key, priority, creation, sync.ThrottleActionUpdate)) } }, // This function is called when an object is to be removed @@ -997,8 +1047,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) if err == nil { wfc.releaseAllWorkflowLocks(ctx, obj) wfc.recordCompletedWorkflow(key) - // no need to add to the queue - this workflow is done - wfc.throttler.Remove(key) + wfc.wfThrottleQueue.Add(sync.NewThrottleKey(key, 0, time.Now(), sync.ThrottleActionDelete)) } }, }, diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index b061a293ecb0..9255c08be371 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -315,6 +315,7 @@ func newController(ctx context.Context, options ...interface{}) (context.CancelF wfc.metrics, testExporter, _ = metrics.CreateDefaultTestMetrics(ctx) wfc.entrypoint = entrypoint.New(kube, wfc.Config.Images) wfc.wfQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()) + wfc.wfThrottleQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()) wfc.throttler = wfc.newThrottler() wfc.rateLimiter = wfc.newRateLimiter() } diff --git a/workflow/sync/throttle_key.go b/workflow/sync/throttle_key.go new file mode 100644 index 000000000000..538783fca291 --- /dev/null +++ b/workflow/sync/throttle_key.go @@ -0,0 +1,47 @@ +package sync + +import ( + "fmt" + "strconv" + "strings" + "time" +) + +// workflowThrottleKey represents a key used in the throttle queue +// Format: "workflowKey/priority/creationTime/action" +type workflowThrottleKey = string + +// ThrottleAction represents the action type for throttle operations +type ThrottleAction string + +const ( + ThrottleActionAdd ThrottleAction = "add" + ThrottleActionUpdate ThrottleAction = "update" + ThrottleActionDelete ThrottleAction = "delete" +) + +// NewThrottleKey creates a throttle key with workflow key, priority, creation time and action +func NewThrottleKey(key string, priority int32, creation time.Time, action ThrottleAction) workflowThrottleKey { + // Use RFC3339 for time format to ensure parse compatibility + return fmt.Sprintf("%s/%d/%s/%s", key, priority, creation.Format(time.RFC3339), action) +} + +// ParseThrottleKey parses a throttle key back to its components +func ParseThrottleKey(throttleKey workflowThrottleKey) (key string, priority int32, creation time.Time, action ThrottleAction) { + parts := strings.SplitN(throttleKey, "/", 4) + if len(parts) != 4 { + return "", 0, time.Time{}, "" + } + key = parts[0] + priority64, err := strconv.ParseInt(parts[1], 10, 32) + if err != nil { + return "", 0, time.Time{}, "" + } + priority = int32(priority64) + creation, err = time.Parse(time.RFC3339, parts[2]) + if err != nil { + return "", 0, time.Time{}, "" + } + action = ThrottleAction(parts[3]) + return key, priority, creation, action +} diff --git a/workflow/sync/throttle_key_test.go b/workflow/sync/throttle_key_test.go new file mode 100644 index 000000000000..64c81a6df4a8 --- /dev/null +++ b/workflow/sync/throttle_key_test.go @@ -0,0 +1,88 @@ +package sync + +import ( + "testing" + "time" +) + +func TestNewThrottleKey(t *testing.T) { + key := "test-namespace/test-workflow" + priority := int32(5) + creation := time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC) + action := ThrottleActionAdd + + throttleKey := NewThrottleKey(key, priority, creation, action) + expected := "test-namespace/test-workflow/5/2023-01-01T12:00:00Z/add" + if throttleKey != expected { + t.Errorf("Expected %s, got %s", expected, throttleKey) + } +} + +func TestParseThrottleKey(t *testing.T) { + throttleKey := "test-namespace/test-workflow/5/2023-01-01T12:00:00Z/add" + key, priority, creation, action := ParseThrottleKey(throttleKey) + + expectedKey := "test-namespace/test-workflow" + expectedPriority := int32(5) + expectedCreation := time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC) + expectedAction := ThrottleActionAdd + + if key != expectedKey { + t.Errorf("Expected key %s, got %s", expectedKey, key) + } + if priority != expectedPriority { + t.Errorf("Expected priority %d, got %d", expectedPriority, priority) + } + if !creation.Equal(expectedCreation) { + t.Errorf("Expected creation %v, got %v", expectedCreation, creation) + } + if action != expectedAction { + t.Errorf("Expected action %s, got %s", expectedAction, action) + } +} + +func TestParseThrottleKeyInvalid(t *testing.T) { + testCases := []struct { + name string + throttleKey string + }{ + {"empty", ""}, + {"too few parts", "test-namespace/test-workflow"}, + {"too many parts", "test-namespace/test-workflow/5/2023-01-01T12:00:00Z/add/extra"}, + {"invalid priority", "test-namespace/test-workflow/invalid/2023-01-01T12:00:00Z/add"}, + {"invalid time", "test-namespace/test-workflow/5/invalid-time/add"}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + key, priority, creation, action := ParseThrottleKey(tc.throttleKey) + if key != "" || priority != 0 || !creation.IsZero() || action != "" { + t.Errorf("Expected empty values for invalid throttle key, got key=%s, priority=%d, creation=%v, action=%s", + key, priority, creation, action) + } + }) + } +} + +func TestThrottleKeyRoundTrip(t *testing.T) { + originalKey := "test-namespace/test-workflow" + originalPriority := int32(10) + originalCreation := time.Now().UTC() + originalAction := ThrottleActionUpdate + + throttleKey := NewThrottleKey(originalKey, originalPriority, originalCreation, originalAction) + parsedKey, parsedPriority, parsedCreation, parsedAction := ParseThrottleKey(throttleKey) + + if parsedKey != originalKey { + t.Errorf("Key mismatch: expected %s, got %s", originalKey, parsedKey) + } + if parsedPriority != originalPriority { + t.Errorf("Priority mismatch: expected %d, got %d", originalPriority, parsedPriority) + } + if !parsedCreation.Equal(originalCreation) { + t.Errorf("Creation time mismatch: expected %v, got %v", originalCreation, parsedCreation) + } + if parsedAction != originalAction { + t.Errorf("Action mismatch: expected %s, got %s", originalAction, parsedAction) + } +} From e9196cf4f84d4f8c23e8ae0f6db7907b02d3322f Mon Sep 17 00:00:00 2001 From: shuangkun Date: Sun, 21 Sep 2025 18:56:44 +0800 Subject: [PATCH 2/6] feat: add throttle queue to speed up eventhandle processing. Co-authored-by: lons Co-authored-by: shuangkun Signed-off-by: shuangkun --- workflow/controller/controller.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index cd8d4641bdb6..7245565c46e6 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -242,9 +242,7 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli } func (wfc *WorkflowController) newThrottler() sync.Throttler { - f := func(key string) { - wfc.wfThrottleQueue.Add(sync.NewThrottleKey(key, 0, time.Now(), sync.ThrottleActionAdd)) - } + f := func(key string) { wfc.wfQueue.Add(key) } return sync.NewMultiThrottler(wfc.Config.Parallelism, wfc.Config.NamespaceParallelism, f) } @@ -855,8 +853,6 @@ func (wfc *WorkflowController) processNextThrottleItem(ctx context.Context) bool logger.WithField("action", action).Warn(ctx, "Unknown throttle action") } - // Add the workflow to the main queue for processing - wfc.wfQueue.Add(key) return true } @@ -1005,6 +1001,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { // for a new workflow, we do not want to rate limit its execution using AddRateLimited + wfc.wfQueue.AddAfter(key, wfc.Config.InitialDelay.Duration) priority, creation := getWfPriority(obj) wfc.wfThrottleQueue.Add(sync.NewThrottleKey(key, priority, creation, sync.ThrottleActionAdd)) } @@ -1019,6 +1016,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) } key, err := cache.MetaNamespaceKeyFunc(new) if err == nil { + wfc.wfQueue.AddRateLimited(key) priority, creation := getWfPriority(new) wfc.wfThrottleQueue.Add(sync.NewThrottleKey(key, priority, creation, sync.ThrottleActionUpdate)) } From ab61934a5ffde4c56636f14b1ce275f520d73340 Mon Sep 17 00:00:00 2001 From: shuangkun Date: Sun, 21 Sep 2025 19:14:00 +0800 Subject: [PATCH 3/6] fix: docs Signed-off-by: shuangkun --- ...dd-throttlequeue-to-speedup-eventhandle.md | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/.features/pending/add-throttlequeue-to-speedup-eventhandle.md b/.features/pending/add-throttlequeue-to-speedup-eventhandle.md index 06c7379c33d5..e46b2b3e109d 100644 --- a/.features/pending/add-throttlequeue-to-speedup-eventhandle.md +++ b/.features/pending/add-throttlequeue-to-speedup-eventhandle.md @@ -1,16 +1,10 @@ - -Description: -Author: -Component: -Issues: +Component: General +Issues: 14791 +Description: Add wfThrottleQueue to accelerate event handling. +Author: [Shuangkun Tian](https://github.com/shuangkun) - +In large-scale scenarios, the throttler's concurrent count calculation can become a bottleneck. +This feature improves performance by decoupling event reception from processing. +The new `wfThrottleQueue` allows the controller to handle workflow events more efficiently by separating throttle operations from the main workflow processing queue. +This reduces contention and improves throughput under high load conditions. +The feature is automatically enabled and can be configured using the `--workflow-throttle-workers` parameter. \ No newline at end of file From e55f0de8703519c32abcb2abe4bbf692f43bfb60 Mon Sep 17 00:00:00 2001 From: shuangkun Date: Sun, 21 Sep 2025 20:03:55 +0800 Subject: [PATCH 4/6] fix: ut. Signed-off-by: shuangkun --- workflow/sync/throttle_key.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/workflow/sync/throttle_key.go b/workflow/sync/throttle_key.go index 538783fca291..230f9692a919 100644 --- a/workflow/sync/throttle_key.go +++ b/workflow/sync/throttle_key.go @@ -28,20 +28,20 @@ func NewThrottleKey(key string, priority int32, creation time.Time, action Throt // ParseThrottleKey parses a throttle key back to its components func ParseThrottleKey(throttleKey workflowThrottleKey) (key string, priority int32, creation time.Time, action ThrottleAction) { - parts := strings.SplitN(throttleKey, "/", 4) - if len(parts) != 4 { + parts := strings.Split(throttleKey, "/") + if len(parts) != 5 { return "", 0, time.Time{}, "" } - key = parts[0] - priority64, err := strconv.ParseInt(parts[1], 10, 32) + key = fmt.Sprintf("%s/%s", parts[0], parts[1]) + priority64, err := strconv.ParseInt(parts[2], 10, 32) if err != nil { return "", 0, time.Time{}, "" } priority = int32(priority64) - creation, err = time.Parse(time.RFC3339, parts[2]) + creation, err = time.Parse(time.RFC3339, parts[3]) if err != nil { return "", 0, time.Time{}, "" } - action = ThrottleAction(parts[3]) + action = ThrottleAction(parts[4]) return key, priority, creation, action } From 71952cf6d5c6528153f0fa1180d966bdae4fe1de Mon Sep 17 00:00:00 2001 From: shuangkun Date: Sun, 21 Sep 2025 20:29:44 +0800 Subject: [PATCH 5/6] fix: ut. Signed-off-by: shuangkun --- workflow/sync/throttle_key_test.go | 56 +++++++++++------------------- 1 file changed, 20 insertions(+), 36 deletions(-) diff --git a/workflow/sync/throttle_key_test.go b/workflow/sync/throttle_key_test.go index 64c81a6df4a8..3963a902702b 100644 --- a/workflow/sync/throttle_key_test.go +++ b/workflow/sync/throttle_key_test.go @@ -3,6 +3,8 @@ package sync import ( "testing" "time" + + "github.com/stretchr/testify/assert" ) func TestNewThrottleKey(t *testing.T) { @@ -13,9 +15,7 @@ func TestNewThrottleKey(t *testing.T) { throttleKey := NewThrottleKey(key, priority, creation, action) expected := "test-namespace/test-workflow/5/2023-01-01T12:00:00Z/add" - if throttleKey != expected { - t.Errorf("Expected %s, got %s", expected, throttleKey) - } + assert.Equal(t, expected, throttleKey, "Throttle key should match expected format") } func TestParseThrottleKey(t *testing.T) { @@ -27,22 +27,14 @@ func TestParseThrottleKey(t *testing.T) { expectedCreation := time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC) expectedAction := ThrottleActionAdd - if key != expectedKey { - t.Errorf("Expected key %s, got %s", expectedKey, key) - } - if priority != expectedPriority { - t.Errorf("Expected priority %d, got %d", expectedPriority, priority) - } - if !creation.Equal(expectedCreation) { - t.Errorf("Expected creation %v, got %v", expectedCreation, creation) - } - if action != expectedAction { - t.Errorf("Expected action %s, got %s", expectedAction, action) - } + assert.Equal(t, expectedKey, key, "Parsed key should match expected") + assert.Equal(t, expectedPriority, priority, "Parsed priority should match expected") + assert.Equal(t, expectedCreation, creation, "Parsed creation time should match expected") + assert.Equal(t, expectedAction, action, "Parsed action should match expected") } func TestParseThrottleKeyInvalid(t *testing.T) { - testCases := []struct { + tests := []struct { name string throttleKey string }{ @@ -53,13 +45,13 @@ func TestParseThrottleKeyInvalid(t *testing.T) { {"invalid time", "test-namespace/test-workflow/5/invalid-time/add"}, } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - key, priority, creation, action := ParseThrottleKey(tc.throttleKey) - if key != "" || priority != 0 || !creation.IsZero() || action != "" { - t.Errorf("Expected empty values for invalid throttle key, got key=%s, priority=%d, creation=%v, action=%s", - key, priority, creation, action) - } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + key, priority, creation, action := ParseThrottleKey(tt.throttleKey) + assert.Empty(t, key, "Key should be empty for invalid throttle key") + assert.Equal(t, int32(0), priority, "Priority should be 0 for invalid throttle key") + assert.True(t, creation.IsZero(), "Creation time should be zero for invalid throttle key") + assert.Empty(t, action, "Action should be empty for invalid throttle key") }) } } @@ -67,22 +59,14 @@ func TestParseThrottleKeyInvalid(t *testing.T) { func TestThrottleKeyRoundTrip(t *testing.T) { originalKey := "test-namespace/test-workflow" originalPriority := int32(10) - originalCreation := time.Now().UTC() + originalCreation := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) originalAction := ThrottleActionUpdate throttleKey := NewThrottleKey(originalKey, originalPriority, originalCreation, originalAction) parsedKey, parsedPriority, parsedCreation, parsedAction := ParseThrottleKey(throttleKey) - if parsedKey != originalKey { - t.Errorf("Key mismatch: expected %s, got %s", originalKey, parsedKey) - } - if parsedPriority != originalPriority { - t.Errorf("Priority mismatch: expected %d, got %d", originalPriority, parsedPriority) - } - if !parsedCreation.Equal(originalCreation) { - t.Errorf("Creation time mismatch: expected %v, got %v", originalCreation, parsedCreation) - } - if parsedAction != originalAction { - t.Errorf("Action mismatch: expected %s, got %s", originalAction, parsedAction) - } + assert.Equal(t, originalKey, parsedKey, "Round-trip key should match original") + assert.Equal(t, originalPriority, parsedPriority, "Round-trip priority should match original") + assert.True(t, originalCreation.Equal(parsedCreation), "Round-trip creation time should match original") + assert.Equal(t, originalAction, parsedAction, "Round-trip action should match original") } From ab94e8fb81dc22d895d22f108e5c639ad31cb948 Mon Sep 17 00:00:00 2001 From: shuangkun Date: Mon, 29 Sep 2025 00:40:34 +0800 Subject: [PATCH 6/6] fix: ut. Signed-off-by: shuangkun --- workflow/controller/controller_test.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 9255c08be371..e87010c8da88 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -710,6 +710,7 @@ func TestParallelism(t *testing.T) { wfv1.MustUnmarshalWorkflow(` metadata: name: my-wf-0 + namespace: default spec: entrypoint: main templates: @@ -720,6 +721,7 @@ spec: wfv1.MustUnmarshalWorkflow(` metadata: name: my-wf-1 + namespace: default spec: entrypoint: main templates: @@ -730,6 +732,7 @@ spec: wfv1.MustUnmarshalWorkflow(` metadata: name: my-wf-2 + namespace: default spec: shutdown: Terminate entrypoint: main @@ -742,20 +745,25 @@ spec: ) defer cancel() + // Process throttle items first to handle throttling + assert.True(t, controller.processNextThrottleItem(ctx)) + assert.True(t, controller.processNextThrottleItem(ctx)) + assert.True(t, controller.processNextThrottleItem(ctx)) + assert.True(t, controller.processNextItem(ctx)) assert.True(t, controller.processNextItem(ctx)) assert.True(t, controller.processNextItem(ctx)) - expectWorkflow(ctx, controller, "my-wf-0", func(wf *wfv1.Workflow) { + expectNamespacedWorkflow(ctx, controller, "default", "my-wf-0", func(wf *wfv1.Workflow) { require.NotNil(t, wf) assert.Equal(t, wfv1.WorkflowRunning, wf.Status.Phase) }) - expectWorkflow(ctx, controller, "my-wf-1", func(wf *wfv1.Workflow) { + expectNamespacedWorkflow(ctx, controller, "default", "my-wf-1", func(wf *wfv1.Workflow) { require.NotNil(t, wf) assert.Equal(t, wfv1.WorkflowPending, wf.Status.Phase) assert.Equal(t, "Workflow processing has been postponed because too many workflows are already running", wf.Status.Message) }) - expectWorkflow(ctx, controller, "my-wf-2", func(wf *wfv1.Workflow) { + expectNamespacedWorkflow(ctx, controller, "default", "my-wf-2", func(wf *wfv1.Workflow) { require.NotNil(t, wf) assert.Equal(t, wfv1.WorkflowFailed, wf.Status.Phase) })