diff --git a/workflow/archive/archive_controller.go b/workflow/archive/archive_controller.go new file mode 100644 index 000000000000..2e5434697c9e --- /dev/null +++ b/workflow/archive/archive_controller.go @@ -0,0 +1,187 @@ +package archive + +import ( + "context" + "encoding/json" + "fmt" + "time" + + syncpkg "github.com/argoproj/pkg/sync" + log "github.com/sirupsen/logrus" + apierr "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" + runtimeutil "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + "github.com/argoproj/argo-workflows/v3/persist/sqldb" + wfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" + "github.com/argoproj/argo-workflows/v3/workflow/common" + "github.com/argoproj/argo-workflows/v3/workflow/hydrator" + "github.com/argoproj/argo-workflows/v3/workflow/metrics" + "github.com/argoproj/argo-workflows/v3/workflow/util" +) + +type Controller struct { + wfclientset wfclientset.Interface + wfInformer cache.SharedIndexInformer + wfArchiveQueue workqueue.RateLimitingInterface + hydrator hydrator.Interface + wfArchive sqldb.WorkflowArchive + workflowKeyLock *syncpkg.KeyLock + metrics *metrics.Metrics +} + +// NewController returns a new workflow archive controller +func NewController(ctx context.Context, wfClientset wfclientset.Interface, wfInformer cache.SharedIndexInformer, metrics *metrics.Metrics, hydrator hydrator.Interface, wfArchive sqldb.WorkflowArchive, workflowKeyLock *syncpkg.KeyLock) *Controller { + controller := &Controller{ + wfclientset: wfClientset, + wfInformer: wfInformer, + wfArchiveQueue: metrics.RateLimiterWithBusyWorkers(ctx, workqueue.DefaultControllerRateLimiter(), "workflow_archive_queue"), + metrics: metrics, + hydrator: hydrator, + wfArchive: wfArchive, + workflowKeyLock: workflowKeyLock, + } + + _, err := wfInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + un, ok := obj.(*unstructured.Unstructured) + // no need to check the `common.LabelKeyCompleted` as we already know it must be complete + return ok && un.GetLabels()[common.LabelKeyWorkflowArchivingStatus] == "Pending" + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + controller.wfArchiveQueue.Add(key) + } + }, + UpdateFunc: func(_, obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + controller.wfArchiveQueue.Add(key) + } + }, + }, + }) + if err != nil { + log.Fatal(err) + } + + return controller +} + +func (c *Controller) Run(stopCh <-chan struct{}, wfArchiveWorkers int) error { + defer runtimeutil.HandleCrash() + defer c.wfArchiveQueue.ShutDown() + log.Infof("Starting workflow archive controller (workflowArchiveWorkers %d)", wfArchiveWorkers) + go c.wfInformer.Run(stopCh) + if ok := cache.WaitForCacheSync(stopCh, c.wfInformer.HasSynced); !ok { + return fmt.Errorf("failed to wait for caches to sync") + } + + for i := 0; i < wfArchiveWorkers; i++ { + go wait.Until(c.runArchiveWorker, time.Second, stopCh) + } + log.Info("Started workflow archive controller") + <-stopCh + log.Info("Shutting workflow archive controller") + return nil +} + +func (c *Controller) runArchiveWorker() { + defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) + + ctx := context.Background() + for c.processNextArchiveItem(ctx) { + } +} + +func (c *Controller) processNextArchiveItem(ctx context.Context) bool { + key, quit := c.wfArchiveQueue.Get() + if quit { + return false + } + defer c.wfArchiveQueue.Done(key) + + obj, exists, err := c.wfInformer.GetIndexer().GetByKey(key.(string)) + if err != nil { + log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get workflow from informer") + return true + } + if !exists { + return true + } + + c.archiveWorkflow(ctx, obj) + return true +} + +func (c *Controller) archiveWorkflow(ctx context.Context, obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + log.Error("failed to get key for object") + return + } + (*c.workflowKeyLock).Lock(key) + defer (*c.workflowKeyLock).Unlock(key) + key, err = cache.MetaNamespaceKeyFunc(obj) + if err != nil { + log.Error("failed to get key for object after locking") + return + } + err = c.archiveWorkflowAux(ctx, obj) + if err != nil { + log.WithField("key", key).WithError(err).Error("failed to archive workflow") + } +} + +func (c *Controller) archiveWorkflowAux(ctx context.Context, obj interface{}) error { + un, ok := obj.(*unstructured.Unstructured) + if !ok { + return nil + } + wf, err := util.FromUnstructured(un) + if err != nil { + return fmt.Errorf("failed to convert to workflow from unstructured: %w", err) + } + err = c.hydrator.Hydrate(wf) + if err != nil { + return fmt.Errorf("failed to hydrate workflow: %w", err) + } + log.WithFields(log.Fields{"namespace": wf.Namespace, "workflow": wf.Name, "uid": wf.UID}).Info("archiving workflow") + err = c.wfArchive.ArchiveWorkflow(wf) + if err != nil { + return fmt.Errorf("failed to archive workflow: %w", err) + } + data, err := json.Marshal(map[string]interface{}{ + "metadata": metav1.ObjectMeta{ + Labels: map[string]string{ + common.LabelKeyWorkflowArchivingStatus: "Archived", + }, + }, + }) + if err != nil { + return fmt.Errorf("failed to marshal patch: %w", err) + } + _, err = c.wfclientset.ArgoprojV1alpha1().Workflows(un.GetNamespace()).Patch( + ctx, + un.GetName(), + types.MergePatchType, + data, + metav1.PatchOptions{}, + ) + if err != nil { + // from this point on we have successfully archived the workflow, and it is possible for the workflow to have actually + // been deleted, so it's not a problem to get a `IsNotFound` error + if apierr.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to archive workflow: %w", err) + } + return nil +} diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 50d222dcb55a..cb0f86b854aa 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -2,7 +2,6 @@ package controller import ( "context" - "encoding/json" "fmt" "os" "strconv" @@ -25,7 +24,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/selection" - "k8s.io/apimachinery/pkg/types" runtimeutil "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" @@ -52,6 +50,7 @@ import ( "github.com/argoproj/argo-workflows/v3/util/diff" "github.com/argoproj/argo-workflows/v3/util/env" errorsutil "github.com/argoproj/argo-workflows/v3/util/errors" + "github.com/argoproj/argo-workflows/v3/workflow/archive" "github.com/argoproj/argo-workflows/v3/workflow/artifactrepositories" "github.com/argoproj/argo-workflows/v3/workflow/common" controllercache "github.com/argoproj/argo-workflows/v3/workflow/controller/cache" @@ -126,7 +125,6 @@ type WorkflowController struct { configMapInformer cache.SharedIndexInformer wfQueue workqueue.RateLimitingInterface podCleanupQueue workqueue.RateLimitingInterface // pods to be deleted or labelled depend on GC strategy - wfArchiveQueue workqueue.RateLimitingInterface throttler sync.Throttler workflowKeyLock syncpkg.KeyLock // used to lock workflows for exclusive modification or access session db.Session @@ -238,7 +236,6 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli wfc.wfQueue = wfc.metrics.RateLimiterWithBusyWorkers(ctx, &fixedItemIntervalRateLimiter{}, "workflow_queue") wfc.throttler = wfc.newThrottler() wfc.podCleanupQueue = wfc.metrics.RateLimiterWithBusyWorkers(ctx, workqueue.DefaultControllerRateLimiter(), "pod_cleanup_queue") - wfc.wfArchiveQueue = wfc.metrics.RateLimiterWithBusyWorkers(ctx, workqueue.DefaultControllerRateLimiter(), "workflow_archive_queue") return &wfc, nil } @@ -269,6 +266,17 @@ func (wfc *WorkflowController) runCronController(ctx context.Context, cronWorkfl cronController.Run(ctx) } +// runArchivecontroller runs the workflow archive controller +func (wfc *WorkflowController) runArchiveController(ctx context.Context, wfArchiveWorkers int) { + defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) + + archiveCtrl := archive.NewController(ctx, wfc.wfclientset, wfc.wfInformer, wfc.metrics, wfc.hydrator, wfc.wfArchive, &wfc.workflowKeyLock) + err := archiveCtrl.Run(ctx.Done(), wfArchiveWorkers) + if err != nil { + panic(err) + } +} + var indexers = cache.Indexers{ indexes.ClusterWorkflowTemplateIndex: indexes.MetaNamespaceLabelIndexFunc(common.LabelKeyClusterWorkflowTemplate), indexes.CronWorkflowIndex: indexes.MetaNamespaceLabelIndexFunc(common.LabelKeyCronWorkflow), @@ -361,15 +369,13 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo go wfc.runGCcontroller(ctx, workflowTTLWorkers) go wfc.runCronController(ctx, cronWorkflowWorkers) + go wfc.runArchiveController(ctx, wfArchiveWorkers) go wait.Until(wfc.syncManager.CheckWorkflowExistence, workflowExistenceCheckPeriod, ctx.Done()) for i := 0; i < wfWorkers; i++ { go wait.Until(wfc.runWorker, time.Second, ctx.Done()) } - for i := 0; i < wfArchiveWorkers; i++ { - go wait.Until(wfc.runArchiveWorker, time.Second, ctx.Done()) - } if cacheGCPeriod != 0 { go wait.JitterUntilWithContext(ctx, wfc.syncAllCacheForGC, cacheGCPeriod, 0.0, true) } @@ -805,14 +811,6 @@ func (wfc *WorkflowController) runWorker() { } } -func (wfc *WorkflowController) runArchiveWorker() { - defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) - - ctx := context.Background() - for wfc.processNextArchiveItem(ctx) { - } -} - // processNextItem is the worker logic for handling workflow updates func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { key, quit := wfc.wfQueue.Get() @@ -896,26 +894,6 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { return true } -func (wfc *WorkflowController) processNextArchiveItem(ctx context.Context) bool { - key, quit := wfc.wfArchiveQueue.Get() - if quit { - return false - } - defer wfc.wfArchiveQueue.Done(key) - - obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key.(string)) - if err != nil { - log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get workflow from informer") - return true - } - if !exists { - return true - } - - wfc.archiveWorkflow(ctx, obj) - return true -} - func (wfc *WorkflowController) getWorkflowByKey(key string) (interface{}, bool) { obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key) if err != nil { @@ -1117,30 +1095,6 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) if err != nil { return err } - _, err = wfc.wfInformer.AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { - un, ok := obj.(*unstructured.Unstructured) - // no need to check the `common.LabelKeyCompleted` as we already know it must be complete - return ok && un.GetLabels()[common.LabelKeyWorkflowArchivingStatus] == "Pending" - }, - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(obj) - if err == nil { - wfc.wfArchiveQueue.Add(key) - } - }, - UpdateFunc: func(_, obj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(obj) - if err == nil { - wfc.wfArchiveQueue.Add(key) - } - }, - }, - }) - if err != nil { - return err - } _, err = wfc.wfInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: func(obj interface{}) { wf, ok := obj.(*unstructured.Unstructured) @@ -1155,71 +1109,6 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) return nil } -func (wfc *WorkflowController) archiveWorkflow(ctx context.Context, obj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(obj) - if err != nil { - log.Error("failed to get key for object") - return - } - wfc.workflowKeyLock.Lock(key) - defer wfc.workflowKeyLock.Unlock(key) - key, err = cache.MetaNamespaceKeyFunc(obj) - if err != nil { - log.Error("failed to get key for object after locking") - return - } - err = wfc.archiveWorkflowAux(ctx, obj) - if err != nil { - log.WithField("key", key).WithError(err).Error("failed to archive workflow") - } -} - -func (wfc *WorkflowController) archiveWorkflowAux(ctx context.Context, obj interface{}) error { - un, ok := obj.(*unstructured.Unstructured) - if !ok { - return nil - } - wf, err := util.FromUnstructured(un) - if err != nil { - return fmt.Errorf("failed to convert to workflow from unstructured: %w", err) - } - err = wfc.hydrator.Hydrate(wf) - if err != nil { - return fmt.Errorf("failed to hydrate workflow: %w", err) - } - log.WithFields(log.Fields{"namespace": wf.Namespace, "workflow": wf.Name, "uid": wf.UID}).Info("archiving workflow") - err = wfc.wfArchive.ArchiveWorkflow(wf) - if err != nil { - return fmt.Errorf("failed to archive workflow: %w", err) - } - data, err := json.Marshal(map[string]interface{}{ - "metadata": metav1.ObjectMeta{ - Labels: map[string]string{ - common.LabelKeyWorkflowArchivingStatus: "Archived", - }, - }, - }) - if err != nil { - return fmt.Errorf("failed to marshal patch: %w", err) - } - _, err = wfc.wfclientset.ArgoprojV1alpha1().Workflows(un.GetNamespace()).Patch( - ctx, - un.GetName(), - types.MergePatchType, - data, - metav1.PatchOptions{}, - ) - if err != nil { - // from this point on we have successfully archived the workflow, and it is possible for the workflow to have actually - // been deleted, so it's not a problem to get a `IsNotFound` error - if apierr.IsNotFound(err) { - return nil - } - return fmt.Errorf("failed to archive workflow: %w", err) - } - return nil -} - var ( incompleteReq, _ = labels.NewRequirement(common.LabelKeyCompleted, selection.Equals, []string{"false"}) workflowReq, _ = labels.NewRequirement(common.LabelKeyWorkflow, selection.Exists, nil)