diff --git a/.features/pending/cache-updated-workflow.md b/.features/pending/cache-updated-workflow.md new file mode 100644 index 000000000000..748cb525d7b6 --- /dev/null +++ b/.features/pending/cache-updated-workflow.md @@ -0,0 +1,6 @@ +Component: General +Issues: 13114 +Description: Fast cache workflows to avoid reconciling outdated objects. +Author: [Shuangkun Tian](https://github.com/shuangkun) + +Use a thread-safe cache.Store to cache the latest workflow. On read, compare fast-cache and informer resourceVersion and use the newer. diff --git a/docs/environment-variables.md b/docs/environment-variables.md index cd7d469e0df9..5ec5f2fc277d 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -32,7 +32,6 @@ This document outlines environment variables that can be used to customize behav | `EXPRESSION_TEMPLATES` | `bool` | `true` | Escape hatch to disable expression templates. | | `EVENT_AGGREGATION_WITH_ANNOTATIONS` | `bool` | `false` | Whether event annotations will be used when aggregating events. | | `GZIP_IMPLEMENTATION` | `string` | `PGZip` | The implementation of compression/decompression. Currently only "`PGZip`" and "`GZip`" are supported. | -| `INFORMER_WRITE_BACK` | `bool` | `true` | Whether to write back to informer instead of catching up. | | `HEALTHZ_AGE` | `time.Duration` | `5m` | How old a un-reconciled workflow is to report unhealthy. | | `INDEX_WORKFLOW_SEMAPHORE_KEYS` | `bool` | `true` | Whether or not to index semaphores. | | `LEADER_ELECTION_IDENTITY` | `string` | Controller's `metadata.name` | The ID used for workflow controllers to elect a leader. | diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 4598ac37038c..42752c5fbbad 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -153,6 +153,8 @@ type WorkflowController struct { recentCompletions recentCompletions // lastUnreconciledWorkflows is a map of workflows that have been recently unreconciled lastUnreconciledWorkflows map[string]*wfv1.Workflow + // workflowFastStore provides fast access to latest workflow objects + workflowFastStore cache.Store } const ( @@ -228,6 +230,9 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli return nil, err } + // Initialize fast cache + wfc.workflowFastStore = cache.NewStore(cache.MetaNamespaceKeyFunc) + deprecation.Initialize(wfc.metrics.DeprecatedFeature) wfc.entrypoint = entrypoint.New(kubeclientset, wfc.Config.Images) @@ -711,7 +716,7 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { wfc.workflowKeyLock.Lock(key) defer wfc.workflowKeyLock.Unlock(key) - obj, ok := wfc.getWorkflowByKey(ctx, key) + obj, ok := wfc.getWorkflowByKeyWithCache(ctx, key) if !ok { return true } @@ -909,6 +914,60 @@ func (wfc *WorkflowController) recordCompletedWorkflow(key string) { } } +// updateWorkflowFastCache updates the fast cache with the latest workflow object +func (wfc *WorkflowController) updateWorkflowFastCache(wf *unstructured.Unstructured) { + // Add or update the workflow in the store (cache.Store is thread-safe) + if wfc.workflowFastStore == nil { + // initialize lazily for controllers constructed in tests + wfc.workflowFastStore = cache.NewStore(cache.MetaNamespaceKeyFunc) + } + if err := wfc.workflowFastStore.Add(wf); err != nil { + // best-effort cache; ignore errors to avoid impacting controller flow + _ = err // explicitly ignore error to satisfy linter + } +} + +// deleteWorkflowFromFastCache removes workflow from fast cache +func (wfc *WorkflowController) deleteWorkflowFromFastCache(key string) { + if wfc.workflowFastStore == nil { + return + } + if obj, exists, _ := wfc.workflowFastStore.GetByKey(key); exists { + _ = wfc.workflowFastStore.Delete(obj) + } +} + +// getWorkflowByKeyWithCache tries to get workflow from fast cache first, then falls back to informer +func (wfc *WorkflowController) getWorkflowByKeyWithCache(ctx context.Context, key string) (interface{}, bool) { + logger := logging.RequireLoggerFromContext(ctx) + + if wfc.wfInformer == nil || wfc.wfInformer.GetIndexer() == nil { + return nil, false + } + + // Get from informer cache first; if not present, we consider it not processable + objInf, infExists, err := wfc.wfInformer.GetIndexer().GetByKey(key) + if !infExists || err != nil { + logger.WithField("key", key).WithError(err).Error(ctx, "Failed to get workflow from informer") + return nil, false + } + + // Try fast cache; if newer than informer, prefer fast + if wfc.workflowFastStore != nil { + objFast, fastExists, _ := wfc.workflowFastStore.GetByKey(key) + if fastExists { + fastUn, okFast := objFast.(*unstructured.Unstructured) + infUn, okInf := objInf.(*unstructured.Unstructured) + if okFast && okInf { + if util.OutDateResourceVersion(infUn.GetResourceVersion(), fastUn.GetResourceVersion()) { + return fastUn.DeepCopy(), true + } + } + } + } + return objInf, true +} + // Returns true if the workflow given by key is in the recently completed // list. Will perform expiry cleanup before checking. func (wfc *WorkflowController) checkRecentlyCompleted(key string) bool { @@ -999,6 +1058,8 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) wfc.recordCompletedWorkflow(key) // no need to add to the queue - this workflow is done wfc.throttler.Remove(key) + // delete from fast cache + wfc.deleteWorkflowFromFastCache(key) } }, }, diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index b061a293ecb0..f630de3223d2 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -908,6 +908,72 @@ func TestIsArchivable(t *testing.T) { }) } +// makeUnstructuredCache creates a minimal unstructured Workflow with given rv. +func makeUnstructuredCache(rv string) *unstructured.Unstructured { + un := &unstructured.Unstructured{} + un.SetNamespace("default") + un.SetName("wf") + un.SetResourceVersion(rv) + return un +} + +// setupControllerForFastCacheTests initializes only the parts needed for cache tests. +func setupControllerForFastCacheTests(t *testing.T) *WorkflowController { + t.Helper() + wfc := &WorkflowController{} + wfc.workflowFastStore = cache.NewStore(cache.MetaNamespaceKeyFunc) + inf := cache.NewSharedIndexInformer(&cache.ListWatch{}, &unstructured.Unstructured{}, 0, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + wfc.wfInformer = inf + return wfc +} + +func TestGetWorkflowByKeyWithCache_PreferFastWhenNewer(t *testing.T) { + wfc := setupControllerForFastCacheTests(t) + key := "default/wf" + // informer rv=10 + _ = wfc.wfInformer.GetStore().Add(makeUnstructuredCache("10")) + // fast rv=20 -> choose fast + wfc.updateWorkflowFastCache(makeUnstructuredCache("20")) + + obj, ok := wfc.getWorkflowByKeyWithCache(logging.TestContext(context.Background()), key) + if !ok { + t.Fatalf("expected object, got none") + } + if rv := obj.(*unstructured.Unstructured).GetResourceVersion(); rv != "20" { + t.Fatalf("expected rv=20 (fast), got %s", rv) + } +} + +func TestGetWorkflowByKeyWithCache_PreferInformerWhenNewerOrEqual(t *testing.T) { + wfc := setupControllerForFastCacheTests(t) + key := "default/wf" + // informer rv=10 + _ = wfc.wfInformer.GetStore().Add(makeUnstructuredCache("10")) + // fast rv=9 -> choose informer + wfc.updateWorkflowFastCache(makeUnstructuredCache("9")) + + obj, ok := wfc.getWorkflowByKeyWithCache(logging.TestContext(context.Background()), key) + if !ok { + t.Fatalf("expected object, got none") + } + if rv := obj.(*unstructured.Unstructured).GetResourceVersion(); rv != "10" { + t.Fatalf("expected rv=10 (informer), got %s", rv) + } +} + +func TestFastCacheUpdateAndDelete(t *testing.T) { + wfc := setupControllerForFastCacheTests(t) + key := "default/wf" + wfc.updateWorkflowFastCache(makeUnstructuredCache("1")) + if _, exists, _ := wfc.workflowFastStore.GetByKey(key); !exists { + t.Fatalf("expected fast cache to contain key after update") + } + wfc.deleteWorkflowFromFastCache(key) + if _, exists, _ := wfc.workflowFastStore.GetByKey(key); exists { + t.Fatalf("expected fast cache to delete key") + } +} + func TestReleaseAllWorkflowLocks(t *testing.T) { ctx := logging.TestContext(t.Context()) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 2f4fac1df744..919d1b2e101c 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -796,16 +796,10 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { woc.log.WithFields(logging.Fields{"resourceVersion": woc.wf.ResourceVersion, "phase": woc.wf.Status.Phase}).Info(ctx, "Workflow update successful") - switch os.Getenv("INFORMER_WRITE_BACK") { - // By default we write back (as per v2.11), this does not reduce errors, but does reduce - // conflicts and therefore we log fewer warning messages. - case "", "true": - if err := woc.writeBackToInformer(); err != nil { - woc.markWorkflowError(ctx, err) - return - } - case "false": - time.Sleep(1 * time.Second) + // Update fast cache with the latest workflow object immediately after successful update + // This ensures we have the most recent version available for subsequent processing + if wfUnstructured, err := wfutil.ToUnstructured(woc.wf); err == nil { + woc.controller.updateWorkflowFastCache(wfUnstructured) } // Make sure the workflow completed. @@ -841,18 +835,6 @@ func (woc *wfOperationCtx) deleteTaskResults(ctx context.Context) error { ) } -func (woc *wfOperationCtx) writeBackToInformer() error { - un, err := wfutil.ToUnstructured(woc.wf) - if err != nil { - return fmt.Errorf("failed to convert workflow to unstructured: %w", err) - } - err = woc.controller.wfInformer.GetStore().Update(un) - if err != nil { - return fmt.Errorf("failed to update informer store: %w", err) - } - return nil -} - // persistWorkflowSizeLimitErr will fail a the workflow with an error when we hit the resource size limit // See https://github.com/argoproj/argo-workflows/issues/913 func (woc *wfOperationCtx) persistWorkflowSizeLimitErr(ctx context.Context, wfClient v1alpha1.WorkflowInterface, err error) { diff --git a/workflow/util/util.go b/workflow/util/util.go index bbedd29ba6e0..b6817f10b568 100644 --- a/workflow/util/util.go +++ b/workflow/util/util.go @@ -1614,3 +1614,22 @@ func FindWaitCtrIndex(pod *apiv1.Pod) (int, error) { } return waitCtrIndex, nil } + +// OutDateResourceVersion checks whether the resourceVersion is outdated +func OutDateResourceVersion(currentRV, cachedRV string) bool { + // Parse both resourceVersions as integers + current, err2 := strconv.ParseInt(currentRV, 10, 64) + cached, err1 := strconv.ParseInt(cachedRV, 10, 64) + + // If either is invalid, assume currentRV is outdated + if err1 != nil || err2 != nil { + return false + } + + // If cached RV is much larger than current, then current is old + if current < cached { + return true + } + + return false +}