diff --git a/.features/pending/add-throttlequeue-to-speedup-eventhandle.md b/.features/pending/add-throttlequeue-to-speedup-eventhandle.md new file mode 100644 index 000000000000..e46b2b3e109d --- /dev/null +++ b/.features/pending/add-throttlequeue-to-speedup-eventhandle.md @@ -0,0 +1,10 @@ +Component: General +Issues: 14791 +Description: Add wfThrottleQueue to accelerate event handling. +Author: [Shuangkun Tian](https://github.com/shuangkun) + +In large-scale scenarios, the throttler's concurrent count calculation can become a bottleneck. +This feature improves performance by decoupling event reception from processing. +The new `wfThrottleQueue` allows the controller to handle workflow events more efficiently by separating throttle operations from the main workflow processing queue. +This reduces contention and improves throughput under high load conditions. +The feature is automatically enabled and can be configured using the `--workflow-throttle-workers` parameter. \ No newline at end of file diff --git a/cmd/workflow-controller/main.go b/cmd/workflow-controller/main.go index 03cd207c999c..6c482b240130 100644 --- a/cmd/workflow-controller/main.go +++ b/cmd/workflow-controller/main.go @@ -58,6 +58,7 @@ func NewRootCommand() *cobra.Command { podCleanupWorkers int // --pod-cleanup-workers cronWorkflowWorkers int // --cron-workflow-workers workflowArchiveWorkers int // --workflow-archive-workers + workflowThrottleWorkers int // --workflow-throttle-workers burst int qps float32 namespaced bool // --namespaced @@ -123,7 +124,7 @@ func NewRootCommand() *cobra.Command { log.Info(ctx, "Leader election is turned off. Running in single-instance mode") log.WithField("id", "single-instance").Info(ctx, "starting leading") - go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, workflowArchiveWorkers) + go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, workflowArchiveWorkers, workflowThrottleWorkers) go wfController.RunPrometheusServer(ctx, false) } else { nodeID, ok := os.LookupEnv("LEADER_ELECTION_IDENTITY") @@ -160,7 +161,7 @@ func NewRootCommand() *cobra.Command { OnStartedLeading: func(ctx context.Context) { dummyCancel() wg.Wait() - go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, workflowArchiveWorkers) + go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, workflowArchiveWorkers, workflowThrottleWorkers) wg.Add(1) go func() { wfController.RunPrometheusServer(ctx, false) @@ -203,6 +204,7 @@ func NewRootCommand() *cobra.Command { command.Flags().IntVar(&podCleanupWorkers, "pod-cleanup-workers", 4, "Number of pod cleanup workers") command.Flags().IntVar(&cronWorkflowWorkers, "cron-workflow-workers", 8, "Number of cron workflow workers") command.Flags().IntVar(&workflowArchiveWorkers, "workflow-archive-workers", 8, "Number of workflow archive workers") + command.Flags().IntVar(&workflowThrottleWorkers, "workflow-throttle-workers", 8, "Number of workflow throttle workers") command.Flags().IntVar(&burst, "burst", 30, "Maximum burst for throttle.") command.Flags().Float32Var(&qps, "qps", 20.0, "Queries per second") command.Flags().BoolVar(&namespaced, "namespaced", false, "run workflow-controller as namespaced mode") diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 4598ac37038c..7245565c46e6 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -126,6 +126,7 @@ type WorkflowController struct { configMapInformer cache.SharedIndexInformer wfQueue workqueue.TypedRateLimitingInterface[string] wfArchiveQueue workqueue.TypedRateLimitingInterface[string] + wfThrottleQueue workqueue.TypedRateLimitingInterface[string] throttler sync.Throttler workflowKeyLock syncpkg.KeyLock // used to lock workflows for exclusive modification or access session db.Session @@ -234,6 +235,7 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli workqueue.SetProvider(wfc.metrics) // must execute SetProvider before we create the queues wfc.wfQueue = wfc.metrics.RateLimiterWithBusyWorkers(ctx, &fixedItemIntervalRateLimiter{}, "workflow_queue") wfc.throttler = wfc.newThrottler() + wfc.wfThrottleQueue = wfc.metrics.RateLimiterWithBusyWorkers(ctx, workqueue.DefaultTypedControllerRateLimiter[string](), "workflow_throttle_queue") wfc.wfArchiveQueue = wfc.metrics.RateLimiterWithBusyWorkers(ctx, workqueue.DefaultTypedControllerRateLimiter[string](), "workflow_archive_queue") return &wfc, nil @@ -279,7 +281,7 @@ var indexers = cache.Indexers{ } // Run starts a Workflow resource controller -func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, wfArchiveWorkers int) { +func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, wfArchiveWorkers, wfThrottleWorkers int) { defer runtimeutil.HandleCrashWithContext(ctx, runtimeutil.PanicHandlers...) logger := logging.RequireLoggerFromContext(ctx) @@ -303,6 +305,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo "podCleanup": podCleanupWorkers, "cronWorkflowWorkers": cronWorkflowWorkers, "workflowArchive": wfArchiveWorkers, + "workflowThrottle": wfThrottleWorkers, }).Info(ctx, "Current Worker Numbers") wfc.wfInformer = util.NewWorkflowInformer(ctx, wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers) @@ -389,6 +392,12 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo for i := 0; i < wfArchiveWorkers; i++ { go wait.UntilWithContext(archiveCtx, wfc.runArchiveWorker, time.Second) } + + throttleCtx, _ := logger.WithField("component", "throttle_worker").InContext(ctx) + for i := 0; i < wfThrottleWorkers; i++ { + go wait.UntilWithContext(throttleCtx, wfc.runThrottleWorker, time.Second) + } + if cacheGCPeriod != 0 { go wait.JitterUntilWithContext(ctx, wfc.syncAllCacheForGC, cacheGCPeriod, 0.0, true) } @@ -700,6 +709,13 @@ func (wfc *WorkflowController) runArchiveWorker(ctx context.Context) { } } +func (wfc *WorkflowController) runThrottleWorker(ctx context.Context) { + defer runtimeutil.HandleCrashWithContext(ctx, runtimeutil.PanicHandlers...) + + for wfc.processNextThrottleItem(ctx) { + } +} + // processNextItem is the worker logic for handling workflow updates func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { key, quit := wfc.wfQueue.Get() @@ -808,6 +824,38 @@ func (wfc *WorkflowController) processNextArchiveItem(ctx context.Context) bool return true } +func (wfc *WorkflowController) processNextThrottleItem(ctx context.Context) bool { + throttleKey, quit := wfc.wfThrottleQueue.Get() + if quit { + return false + } + defer wfc.wfThrottleQueue.Done(throttleKey) + + logger := logging.RequireLoggerFromContext(ctx) + key, priority, creation, action := sync.ParseThrottleKey(throttleKey) + if key == "" { + logger.WithField("throttleKey", throttleKey).Warn(ctx, "Failed to parse throttle key") + return true + } + + logger.WithField("key", key). + WithField("action", action). + WithField("priority", priority). + WithField("creation", creation). + Debug(ctx, "Processing throttle item") + + switch action { + case sync.ThrottleActionAdd, sync.ThrottleActionUpdate: + wfc.throttler.Add(key, priority, creation) + case sync.ThrottleActionDelete: + wfc.throttler.Remove(key) + default: + logger.WithField("action", action).Warn(ctx, "Unknown throttle action") + } + + return true +} + func (wfc *WorkflowController) getWorkflowByKey(ctx context.Context, key string) (interface{}, bool) { logger := logging.RequireLoggerFromContext(ctx) obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key) @@ -955,7 +1003,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) // for a new workflow, we do not want to rate limit its execution using AddRateLimited wfc.wfQueue.AddAfter(key, wfc.Config.InitialDelay.Duration) priority, creation := getWfPriority(obj) - wfc.throttler.Add(key, priority, creation) + wfc.wfThrottleQueue.Add(sync.NewThrottleKey(key, priority, creation, sync.ThrottleActionAdd)) } }, // This function is called when an updated (we already know about this object) @@ -970,7 +1018,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) if err == nil { wfc.wfQueue.AddRateLimited(key) priority, creation := getWfPriority(new) - wfc.throttler.Add(key, priority, creation) + wfc.wfThrottleQueue.Add(sync.NewThrottleKey(key, priority, creation, sync.ThrottleActionUpdate)) } }, // This function is called when an object is to be removed @@ -997,8 +1045,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) if err == nil { wfc.releaseAllWorkflowLocks(ctx, obj) wfc.recordCompletedWorkflow(key) - // no need to add to the queue - this workflow is done - wfc.throttler.Remove(key) + wfc.wfThrottleQueue.Add(sync.NewThrottleKey(key, 0, time.Now(), sync.ThrottleActionDelete)) } }, }, diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index b061a293ecb0..e87010c8da88 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -315,6 +315,7 @@ func newController(ctx context.Context, options ...interface{}) (context.CancelF wfc.metrics, testExporter, _ = metrics.CreateDefaultTestMetrics(ctx) wfc.entrypoint = entrypoint.New(kube, wfc.Config.Images) wfc.wfQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()) + wfc.wfThrottleQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()) wfc.throttler = wfc.newThrottler() wfc.rateLimiter = wfc.newRateLimiter() } @@ -709,6 +710,7 @@ func TestParallelism(t *testing.T) { wfv1.MustUnmarshalWorkflow(` metadata: name: my-wf-0 + namespace: default spec: entrypoint: main templates: @@ -719,6 +721,7 @@ spec: wfv1.MustUnmarshalWorkflow(` metadata: name: my-wf-1 + namespace: default spec: entrypoint: main templates: @@ -729,6 +732,7 @@ spec: wfv1.MustUnmarshalWorkflow(` metadata: name: my-wf-2 + namespace: default spec: shutdown: Terminate entrypoint: main @@ -741,20 +745,25 @@ spec: ) defer cancel() + // Process throttle items first to handle throttling + assert.True(t, controller.processNextThrottleItem(ctx)) + assert.True(t, controller.processNextThrottleItem(ctx)) + assert.True(t, controller.processNextThrottleItem(ctx)) + assert.True(t, controller.processNextItem(ctx)) assert.True(t, controller.processNextItem(ctx)) assert.True(t, controller.processNextItem(ctx)) - expectWorkflow(ctx, controller, "my-wf-0", func(wf *wfv1.Workflow) { + expectNamespacedWorkflow(ctx, controller, "default", "my-wf-0", func(wf *wfv1.Workflow) { require.NotNil(t, wf) assert.Equal(t, wfv1.WorkflowRunning, wf.Status.Phase) }) - expectWorkflow(ctx, controller, "my-wf-1", func(wf *wfv1.Workflow) { + expectNamespacedWorkflow(ctx, controller, "default", "my-wf-1", func(wf *wfv1.Workflow) { require.NotNil(t, wf) assert.Equal(t, wfv1.WorkflowPending, wf.Status.Phase) assert.Equal(t, "Workflow processing has been postponed because too many workflows are already running", wf.Status.Message) }) - expectWorkflow(ctx, controller, "my-wf-2", func(wf *wfv1.Workflow) { + expectNamespacedWorkflow(ctx, controller, "default", "my-wf-2", func(wf *wfv1.Workflow) { require.NotNil(t, wf) assert.Equal(t, wfv1.WorkflowFailed, wf.Status.Phase) }) diff --git a/workflow/sync/throttle_key.go b/workflow/sync/throttle_key.go new file mode 100644 index 000000000000..230f9692a919 --- /dev/null +++ b/workflow/sync/throttle_key.go @@ -0,0 +1,47 @@ +package sync + +import ( + "fmt" + "strconv" + "strings" + "time" +) + +// workflowThrottleKey represents a key used in the throttle queue +// Format: "workflowKey/priority/creationTime/action" +type workflowThrottleKey = string + +// ThrottleAction represents the action type for throttle operations +type ThrottleAction string + +const ( + ThrottleActionAdd ThrottleAction = "add" + ThrottleActionUpdate ThrottleAction = "update" + ThrottleActionDelete ThrottleAction = "delete" +) + +// NewThrottleKey creates a throttle key with workflow key, priority, creation time and action +func NewThrottleKey(key string, priority int32, creation time.Time, action ThrottleAction) workflowThrottleKey { + // Use RFC3339 for time format to ensure parse compatibility + return fmt.Sprintf("%s/%d/%s/%s", key, priority, creation.Format(time.RFC3339), action) +} + +// ParseThrottleKey parses a throttle key back to its components +func ParseThrottleKey(throttleKey workflowThrottleKey) (key string, priority int32, creation time.Time, action ThrottleAction) { + parts := strings.Split(throttleKey, "/") + if len(parts) != 5 { + return "", 0, time.Time{}, "" + } + key = fmt.Sprintf("%s/%s", parts[0], parts[1]) + priority64, err := strconv.ParseInt(parts[2], 10, 32) + if err != nil { + return "", 0, time.Time{}, "" + } + priority = int32(priority64) + creation, err = time.Parse(time.RFC3339, parts[3]) + if err != nil { + return "", 0, time.Time{}, "" + } + action = ThrottleAction(parts[4]) + return key, priority, creation, action +} diff --git a/workflow/sync/throttle_key_test.go b/workflow/sync/throttle_key_test.go new file mode 100644 index 000000000000..3963a902702b --- /dev/null +++ b/workflow/sync/throttle_key_test.go @@ -0,0 +1,72 @@ +package sync + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewThrottleKey(t *testing.T) { + key := "test-namespace/test-workflow" + priority := int32(5) + creation := time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC) + action := ThrottleActionAdd + + throttleKey := NewThrottleKey(key, priority, creation, action) + expected := "test-namespace/test-workflow/5/2023-01-01T12:00:00Z/add" + assert.Equal(t, expected, throttleKey, "Throttle key should match expected format") +} + +func TestParseThrottleKey(t *testing.T) { + throttleKey := "test-namespace/test-workflow/5/2023-01-01T12:00:00Z/add" + key, priority, creation, action := ParseThrottleKey(throttleKey) + + expectedKey := "test-namespace/test-workflow" + expectedPriority := int32(5) + expectedCreation := time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC) + expectedAction := ThrottleActionAdd + + assert.Equal(t, expectedKey, key, "Parsed key should match expected") + assert.Equal(t, expectedPriority, priority, "Parsed priority should match expected") + assert.Equal(t, expectedCreation, creation, "Parsed creation time should match expected") + assert.Equal(t, expectedAction, action, "Parsed action should match expected") +} + +func TestParseThrottleKeyInvalid(t *testing.T) { + tests := []struct { + name string + throttleKey string + }{ + {"empty", ""}, + {"too few parts", "test-namespace/test-workflow"}, + {"too many parts", "test-namespace/test-workflow/5/2023-01-01T12:00:00Z/add/extra"}, + {"invalid priority", "test-namespace/test-workflow/invalid/2023-01-01T12:00:00Z/add"}, + {"invalid time", "test-namespace/test-workflow/5/invalid-time/add"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + key, priority, creation, action := ParseThrottleKey(tt.throttleKey) + assert.Empty(t, key, "Key should be empty for invalid throttle key") + assert.Equal(t, int32(0), priority, "Priority should be 0 for invalid throttle key") + assert.True(t, creation.IsZero(), "Creation time should be zero for invalid throttle key") + assert.Empty(t, action, "Action should be empty for invalid throttle key") + }) + } +} + +func TestThrottleKeyRoundTrip(t *testing.T) { + originalKey := "test-namespace/test-workflow" + originalPriority := int32(10) + originalCreation := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + originalAction := ThrottleActionUpdate + + throttleKey := NewThrottleKey(originalKey, originalPriority, originalCreation, originalAction) + parsedKey, parsedPriority, parsedCreation, parsedAction := ParseThrottleKey(throttleKey) + + assert.Equal(t, originalKey, parsedKey, "Round-trip key should match original") + assert.Equal(t, originalPriority, parsedPriority, "Round-trip priority should match original") + assert.True(t, originalCreation.Equal(parsedCreation), "Round-trip creation time should match original") + assert.Equal(t, originalAction, parsedAction, "Round-trip action should match original") +}