From bbcdb47f3f83ffc33f20ebbcbe92dfce53636925 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Thu, 3 Jul 2025 17:19:23 +0800 Subject: [PATCH 1/2] issue 8344: constrain data path expose Signed-off-by: Lyndon-Li --- changelogs/unreleased/9064-Lyndon-Li | 1 + pkg/builder/pod_volume_backup_builder.go | 6 + pkg/builder/pod_volume_restore_builder.go | 6 + pkg/cmd/cli/nodeagent/server.go | 17 +- pkg/controller/data_download_controller.go | 43 +++- .../data_download_controller_test.go | 44 +++- pkg/controller/data_upload_controller.go | 40 +++- pkg/controller/data_upload_controller_test.go | 43 +++- .../pod_volume_backup_controller.go | 28 ++- .../pod_volume_backup_controller_test.go | 31 ++- .../pod_volume_restore_controller.go | 28 ++- .../pod_volume_restore_controller_test.go | 33 ++- pkg/exposer/types.go | 1 + pkg/exposer/vgdp_counter.go | 223 ++++++++++++++++++ pkg/exposer/vgdp_counter_test.go | 181 ++++++++++++++ pkg/nodeagent/node_agent.go | 3 + 16 files changed, 671 insertions(+), 57 deletions(-) create mode 100644 changelogs/unreleased/9064-Lyndon-Li create mode 100644 pkg/exposer/vgdp_counter.go create mode 100644 pkg/exposer/vgdp_counter_test.go diff --git a/changelogs/unreleased/9064-Lyndon-Li b/changelogs/unreleased/9064-Lyndon-Li new file mode 100644 index 0000000000..34392dd829 --- /dev/null +++ b/changelogs/unreleased/9064-Lyndon-Li @@ -0,0 +1 @@ +Fix #8344, add a mechanism to soothe creation of data mover pods for DataUpload, DataDownload, PodVolumeBackup and PodVolumeRestore \ No newline at end of file diff --git a/pkg/builder/pod_volume_backup_builder.go b/pkg/builder/pod_volume_backup_builder.go index e16ed223f9..9db8beacd3 100644 --- a/pkg/builder/pod_volume_backup_builder.go +++ b/pkg/builder/pod_volume_backup_builder.go @@ -149,3 +149,9 @@ func (b *PodVolumeBackupBuilder) OwnerReference(ref metav1.OwnerReference) *PodV b.object.OwnerReferences = append(b.object.OwnerReferences, ref) return b } + +// Labels sets the PodVolumeBackup's Labels. +func (b *PodVolumeBackupBuilder) Labels(label map[string]string) *PodVolumeBackupBuilder { + b.object.Labels = label + return b +} diff --git a/pkg/builder/pod_volume_restore_builder.go b/pkg/builder/pod_volume_restore_builder.go index 5f54b3d336..50f277eac9 100644 --- a/pkg/builder/pod_volume_restore_builder.go +++ b/pkg/builder/pod_volume_restore_builder.go @@ -133,3 +133,9 @@ func (b *PodVolumeRestoreBuilder) Node(node string) *PodVolumeRestoreBuilder { b.object.Status.Node = node return b } + +// Labels sets the PodVolumeRestoreBuilder's Labels. +func (b *PodVolumeRestoreBuilder) Labels(label map[string]string) *PodVolumeRestoreBuilder { + b.object.Labels = label + return b +} diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 78dd1abcd8..76a04752d3 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -57,6 +57,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/constant" "github.com/vmware-tanzu/velero/pkg/controller" "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/exposer" "github.com/vmware-tanzu/velero/pkg/metrics" "github.com/vmware-tanzu/velero/pkg/nodeagent" "github.com/vmware-tanzu/velero/pkg/util/filesystem" @@ -140,6 +141,7 @@ type nodeAgentServer struct { csiSnapshotClient *snapshotv1client.Clientset dataPathMgr *datapath.Manager dataPathConfigs *nodeagent.Configs + vgdpCounter *exposer.VgdpCounter } func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, config nodeAgentServerConfig) (*nodeAgentServer, error) { @@ -300,12 +302,21 @@ func (s *nodeAgentServer) run() { } } - pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.metrics, s.logger) + if s.dataPathConfigs != nil && s.dataPathConfigs.LoadConcurrency != nil && s.dataPathConfigs.LoadConcurrency.WaitQueueLength > 0 { + if counter, err := exposer.StartVgdpCounter(s.ctx, s.mgr, s.dataPathConfigs.LoadConcurrency.WaitQueueLength); err != nil { + s.logger.WithError(err).Warnf("Failed to start VGDP counter, VDGP loads are not constrained") + } else { + s.vgdpCounter = counter + s.logger.Infof("VGDP loads are constrained with %d", s.dataPathConfigs.LoadConcurrency.WaitQueueLength) + } + } + + pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.vgdpCounter, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.metrics, s.logger) if err := pvbReconciler.SetupWithManager(s.mgr); err != nil { s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerPodVolumeBackup) } - pvrReconciler := controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.logger) + pvrReconciler := controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.vgdpCounter, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.logger) if err := pvrReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller") } @@ -320,6 +331,7 @@ func (s *nodeAgentServer) run() { s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, + s.vgdpCounter, loadAffinity, backupPVCConfig, podResources, @@ -344,6 +356,7 @@ func (s *nodeAgentServer) run() { s.mgr, s.kubeClient, s.dataPathMgr, + s.vgdpCounter, loadAffinity, restorePVCConfig, podResources, diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index 22f811af94..1047dc44e5 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -64,6 +64,7 @@ type DataDownloadReconciler struct { restoreExposer exposer.GenericRestoreExposer nodeName string dataPathMgr *datapath.Manager + vgdpCounter *exposer.VgdpCounter loadAffinity []*kube.LoadAffinity restorePVCConfig nodeagent.RestorePVC podResources corev1api.ResourceRequirements @@ -77,6 +78,7 @@ func NewDataDownloadReconciler( mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, + counter *exposer.VgdpCounter, loadAffinity []*kube.LoadAffinity, restorePVCConfig nodeagent.RestorePVC, podResources corev1api.ResourceRequirements, @@ -95,6 +97,7 @@ func NewDataDownloadReconciler( restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger), restorePVCConfig: restorePVCConfig, dataPathMgr: dataPathMgr, + vgdpCounter: counter, loadAffinity: loadAffinity, podResources: podResources, preparingTimeout: preparingTimeout, @@ -220,13 +223,26 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request } if dd.Status.Phase == "" || dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseNew { - log.Info("Data download starting") + if dd.Spec.Cancel { + log.Debugf("Data download is canceled in Phase %s", dd.Status.Phase) + + r.tryCancelDataDownload(ctx, dd, "") + + return ctrl.Result{}, nil + } + + if r.vgdpCounter != nil && r.vgdpCounter.IsConstrained(ctx, r.logger) { + log.Debug("Data path initiation is constrained, requeue later") + return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil + } if _, err := r.getTargetPVC(ctx, dd); err != nil { log.WithField("error", err).Debugf("Cannot find target PVC for DataDownload yet. Retry later.") return ctrl.Result{Requeue: true}, nil } + log.Info("Data download starting") + accepted, err := r.acceptDataDownload(ctx, dd) if err != nil { return ctrl.Result{}, errors.Wrapf(err, "error accepting the data download %s", dd.Name) @@ -239,12 +255,6 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request log.Info("Data download is accepted") - if dd.Spec.Cancel { - log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase) - r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName()) - return ctrl.Result{}, nil - } - exposeParam, err := r.setupExposeParam(dd) if err != nil { return r.errorOut(ctx, dd, err, "failed to set exposer parameters", log) @@ -312,7 +322,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request dd.Name, dd.Namespace, result.ByPod.HostingPod.Name, result.ByPod.HostingContainer, dd.Name, callbacks, false, log) if err != nil { if err == datapath.ConcurrentLimitExceed { - log.Info("Data path instance is concurrent limited requeue later") + log.Debug("Data path instance is concurrent limited requeue later") return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil } else { return r.errorOut(ctx, dd, err, "error to create data path", log) @@ -337,6 +347,8 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} + delete(dd.Labels, exposer.ExposeOnGoingLabel) + return true }); err != nil { log.WithError(err).Warnf("Failed to update datadownload %s to InProgress, will data path close and retry", dd.Name) @@ -454,6 +466,8 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCompleted dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} + delete(dd.Labels, exposer.ExposeOnGoingLabel) + return true }); err != nil { log.WithError(err).Error("error updating data download status") @@ -504,6 +518,8 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na } dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} + delete(dd.Labels, exposer.ExposeOnGoingLabel) + return true }); err != nil { log.WithError(err).Error("error updating data download status") @@ -525,6 +541,8 @@ func (r *DataDownloadReconciler) tryCancelDataDownload(ctx context.Context, dd * if message != "" { dataDownload.Status.Message = message } + + delete(dataDownload.Labels, exposer.ExposeOnGoingLabel) }) if err != nil { @@ -702,6 +720,8 @@ func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *v dd.Status.Message = errors.WithMessage(err, msg).Error() dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} + delete(dd.Labels, exposer.ExposeOnGoingLabel) + return true }); patchErr != nil { log.WithError(patchErr).Error("error updating DataDownload status") @@ -724,6 +744,11 @@ func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *vel datadownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted datadownload.Status.AcceptedByNode = r.nodeName datadownload.Status.AcceptedTimestamp = &metav1.Time{Time: r.Clock.Now()} + + if datadownload.Labels == nil { + datadownload.Labels = make(map[string]string) + } + datadownload.Labels[exposer.ExposeOnGoingLabel] = "true" } succeeded, err := funcExclusiveUpdateDataDownload(ctx, r.client, updated, updateFunc) @@ -749,6 +774,8 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler succeeded, err := funcExclusiveUpdateDataDownload(ctx, r.client, dd, func(dd *velerov2alpha1api.DataDownload) { dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed dd.Status.Message = "timeout on preparing data download" + + delete(dd.Labels, exposer.ExposeOnGoingLabel) }) if err != nil { diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index 8f629d4687..756fd03a88 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -136,6 +136,7 @@ func initDataDownloadReconcilerWithError(t *testing.T, objects []any, needError fakeKubeClient, dataPathMgr, nil, + nil, nodeagent.RestorePVC{}, corev1api.ResourceRequirements{}, "test-node", @@ -195,6 +196,7 @@ func TestDataDownloadReconcile(t *testing.T) { mockCancel bool mockClose bool needExclusiveUpdateError error + constrained bool expected *velerov2alpha1api.DataDownload expectDeleted bool expectCancelRecord bool @@ -295,6 +297,20 @@ func TestDataDownloadReconcile(t *testing.T) { name: "Unknown data download status", dd: dataDownloadBuilder().Phase("Unknown").Finalizers([]string{DataUploadDownloadFinalizer}).Result(), }, + { + name: "dd is cancel on new", + dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + expectCancelRecord: true, + expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(), + }, + { + name: "new dd but constrained", + dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + constrained: true, + expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + }, { name: "new dd but no target PVC", dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), @@ -308,12 +324,6 @@ func TestDataDownloadReconcile(t *testing.T) { expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), expectedErr: "error accepting the data download datadownload-1: exclusive-update-error", }, - { - name: "dd is cancel on accepted", - dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Result(), - targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(), - }, { name: "dd is accepted but setup expose param failed", dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).NodeOS("xxx").Result(), @@ -488,6 +498,10 @@ func TestDataDownloadReconcile(t *testing.T) { r.cancelledDataDownload[test.dd.Name] = test.sportTime.Time } + if test.constrained { + r.vgdpCounter = &exposer.VgdpCounter{} + } + funcExclusiveUpdateDataDownload = exclusiveUpdateDataDownload if test.needExclusiveUpdateError != nil { funcExclusiveUpdateDataDownload = func(context.Context, kbclient.Client, *velerov2alpha1api.DataDownload, func(*velerov2alpha1api.DataDownload)) (bool, error) { @@ -571,13 +585,13 @@ func TestDataDownloadReconcile(t *testing.T) { assert.Equal(t, test.expectedResult.RequeueAfter, actualResult.RequeueAfter) } - if test.expected != nil || test.expectDeleted { - dd := velerov2alpha1api.DataDownload{} - err = r.client.Get(ctx, kbclient.ObjectKey{ - Name: test.dd.Name, - Namespace: test.dd.Namespace, - }, &dd) + dd := velerov2alpha1api.DataDownload{} + err = r.client.Get(ctx, kbclient.ObjectKey{ + Name: test.dd.Name, + Namespace: test.dd.Namespace, + }, &dd) + if test.expected != nil || test.expectDeleted { if test.expectDeleted { assert.True(t, apierrors.IsNotFound(err)) } else { @@ -601,6 +615,12 @@ func TestDataDownloadReconcile(t *testing.T) { } else { assert.Empty(t, r.cancelledDataDownload) } + + if isDataDownloadInFinalState(&dd) || dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { + assert.NotContains(t, dd.Labels, exposer.ExposeOnGoingLabel) + } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted { + assert.Contains(t, dd.Labels, exposer.ExposeOnGoingLabel) + } }) } } diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 9f9563d6be..164999670b 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -74,6 +74,7 @@ type DataUploadReconciler struct { logger logrus.FieldLogger snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer dataPathMgr *datapath.Manager + vgdpCounter *exposer.VgdpCounter loadAffinity []*kube.LoadAffinity backupPVCConfig map[string]nodeagent.BackupPVC podResources corev1api.ResourceRequirements @@ -88,6 +89,7 @@ func NewDataUploadReconciler( kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface, dataPathMgr *datapath.Manager, + counter *exposer.VgdpCounter, loadAffinity []*kube.LoadAffinity, backupPVCConfig map[string]nodeagent.BackupPVC, podResources corev1api.ResourceRequirements, @@ -113,6 +115,7 @@ func NewDataUploadReconciler( ), }, dataPathMgr: dataPathMgr, + vgdpCounter: counter, loadAffinity: loadAffinity, backupPVCConfig: backupPVCConfig, podResources: podResources, @@ -241,6 +244,19 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) } if du.Status.Phase == "" || du.Status.Phase == velerov2alpha1api.DataUploadPhaseNew { + if du.Spec.Cancel { + log.Debugf("Data upload is canceled in Phase %s", du.Status.Phase) + + r.tryCancelDataUpload(ctx, du, "") + + return ctrl.Result{}, nil + } + + if r.vgdpCounter != nil && r.vgdpCounter.IsConstrained(ctx, r.logger) { + log.Debug("Data path initiation is constrained, requeue later") + return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil + } + log.Info("Data upload starting") accepted, err := r.acceptDataUpload(ctx, du) @@ -255,11 +271,6 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) log.Info("Data upload is accepted") - if du.Spec.Cancel { - r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName()) - return ctrl.Result{}, nil - } - exposeParam, err := r.setupExposeParam(du) if err != nil { return r.errorOut(ctx, du, err, "failed to set exposer parameters", log) @@ -330,7 +341,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) du.Name, du.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, du.Name, callbacks, false, log) if err != nil { if err == datapath.ConcurrentLimitExceed { - log.Info("Data path instance is concurrent limited requeue later") + log.Debug("Data path instance is concurrent limited requeue later") return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil } else { return r.errorOut(ctx, du, err, "error to create data path", log) @@ -356,6 +367,8 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} du.Status.NodeOS = velerov2alpha1api.NodeOS(*res.ByPod.NodeOS) + delete(du.Labels, exposer.ExposeOnGoingLabel) + return true }); err != nil { log.WithError(err).Warnf("Failed to update dataupload %s to InProgress, will data path close and retry", du.Name) @@ -481,6 +494,8 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp du.Status.Message = "volume was empty so no data was upload" } + delete(du.Labels, exposer.ExposeOnGoingLabel) + return true }); err != nil { log.WithError(err).Error("error updating DataUpload status") @@ -531,6 +546,8 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp } du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} + delete(du.Labels, exposer.ExposeOnGoingLabel) + return true }); err != nil { log.WithError(err).Error("error updating DataUpload status") @@ -552,6 +569,8 @@ func (r *DataUploadReconciler) tryCancelDataUpload(ctx context.Context, du *vele if message != "" { dataUpload.Status.Message = message } + + delete(dataUpload.Labels, exposer.ExposeOnGoingLabel) }) if err != nil { @@ -760,6 +779,8 @@ func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *vel } du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} + delete(du.Labels, exposer.ExposeOnGoingLabel) + return true }); patchErr != nil { log.WithError(patchErr).Error("error updating DataUpload status") @@ -781,6 +802,11 @@ func (r *DataUploadReconciler) acceptDataUpload(ctx context.Context, du *velerov dataUpload.Status.Phase = velerov2alpha1api.DataUploadPhaseAccepted dataUpload.Status.AcceptedByNode = r.nodeName dataUpload.Status.AcceptedTimestamp = &metav1.Time{Time: r.Clock.Now()} + + if dataUpload.Labels == nil { + dataUpload.Labels = make(map[string]string) + } + dataUpload.Labels[exposer.ExposeOnGoingLabel] = "true" } succeeded, err := funcExclusiveUpdateDataUpload(ctx, r.client, updated, updateFunc) @@ -807,6 +833,8 @@ func (r *DataUploadReconciler) onPrepareTimeout(ctx context.Context, du *velerov succeeded, err := funcExclusiveUpdateDataUpload(ctx, r.client, du, func(du *velerov2alpha1api.DataUpload) { du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed du.Status.Message = "timeout on preparing data upload" + + delete(du.Labels, exposer.ExposeOnGoingLabel) }) if err != nil { diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index 44bb2a170f..e1d4ad13c3 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -241,6 +241,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, + nil, map[string]nodeagent.BackupPVC{}, corev1api.ResourceRequirements{}, testclocks.NewFakeClock(now), @@ -258,7 +259,6 @@ func dataUploadBuilder() *builder.DataUploadBuilder { VolumeSnapshot: "fake-volume-snapshot", } return builder.ForDataUpload(velerov1api.DefaultNamespace, dataUploadName). - Labels(map[string]string{velerov1api.DataUploadLabel: dataUploadName}). BackupStorageLocation("bsl-loc"). DataMover("velero"). SnapshotType("CSI").SourceNamespace("fake-ns").SourcePVC("test-pvc").CSISnapshot(csi) @@ -361,6 +361,7 @@ func TestReconcile(t *testing.T) { getExposeNil bool fsBRInitErr error fsBRStartErr error + constrained bool expectedErr string expectedResult *ctrl.Result expectDataPath bool @@ -464,6 +465,19 @@ func TestReconcile(t *testing.T) { expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(), expectedErr: "unknown type type of snapshot exposer is not exist", }, + { + name: "du is cancel on new", + du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Result(), + expectCancelRecord: true, + expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(), + }, + { + name: "new du but constrained", + du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + constrained: true, + expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), + expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + }, { name: "new du but accept failed", du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), @@ -471,11 +485,6 @@ func TestReconcile(t *testing.T) { expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), expectedErr: "error accepting the data upload dataupload-1: exclusive-update-error", }, - { - name: "du is cancel on accepted", - du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Result(), - expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(), - }, { name: "du is accepted but setup expose param failed on getting PVC", du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(), @@ -636,6 +645,10 @@ func TestReconcile(t *testing.T) { r.cancelledDataUpload[test.du.Name] = test.sportTime.Time } + if test.constrained { + r.vgdpCounter = &exposer.VgdpCounter{} + } + if test.du.Spec.SnapshotType == fakeSnapshotType { r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{fakeSnapshotType: &fakeSnapshotExposer{r.client, r.Clock, test.ambiguousNodeOS, test.peekErr, test.exposeErr, test.getExposeErr, test.getExposeNil}} } else if test.du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { @@ -683,13 +696,13 @@ func TestReconcile(t *testing.T) { assert.Equal(t, test.expectedResult.RequeueAfter, actualResult.RequeueAfter) } - if test.expected != nil || test.expectDeleted { - du := velerov2alpha1api.DataUpload{} - err = r.client.Get(ctx, kbclient.ObjectKey{ - Name: test.du.Name, - Namespace: test.du.Namespace, - }, &du) + du := velerov2alpha1api.DataUpload{} + err = r.client.Get(ctx, kbclient.ObjectKey{ + Name: test.du.Name, + Namespace: test.du.Namespace, + }, &du) + if test.expected != nil || test.expectDeleted { if test.expectDeleted { assert.True(t, apierrors.IsNotFound(err)) } else { @@ -713,6 +726,12 @@ func TestReconcile(t *testing.T) { } else { assert.Empty(t, r.cancelledDataUpload) } + + if isDataUploadInFinalState(&du) || du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { + assert.NotContains(t, du.Labels, exposer.ExposeOnGoingLabel) + } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted { + assert.Contains(t, du.Labels, exposer.ExposeOnGoingLabel) + } }) } } diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index 344ccba97f..2f1f42cc89 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -59,7 +59,7 @@ const ( // NewPodVolumeBackupReconciler creates the PodVolumeBackupReconciler instance func NewPodVolumeBackupReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, - nodeName string, preparingTimeout time.Duration, resourceTimeout time.Duration, podResources corev1api.ResourceRequirements, + counter *exposer.VgdpCounter, nodeName string, preparingTimeout time.Duration, resourceTimeout time.Duration, podResources corev1api.ResourceRequirements, metrics *metrics.ServerMetrics, logger logrus.FieldLogger) *PodVolumeBackupReconciler { return &PodVolumeBackupReconciler{ client: client, @@ -71,6 +71,7 @@ func NewPodVolumeBackupReconciler(client client.Client, mgr manager.Manager, kub metrics: metrics, podResources: podResources, dataPathMgr: dataPathMgr, + vgdpCounter: counter, preparingTimeout: preparingTimeout, resourceTimeout: resourceTimeout, exposer: exposer.NewPodVolumeExposer(kubeClient, logger), @@ -90,6 +91,7 @@ type PodVolumeBackupReconciler struct { logger logrus.FieldLogger podResources corev1api.ResourceRequirements dataPathMgr *datapath.Manager + vgdpCounter *exposer.VgdpCounter preparingTimeout time.Duration resourceTimeout time.Duration cancelledPVB map[string]time.Time @@ -212,6 +214,11 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, nil } + if r.vgdpCounter != nil && r.vgdpCounter.IsConstrained(ctx, r.logger) { + log.Debug("Data path initiation is constrained, requeue later") + return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil + } + log.Info("Accepting PVB") if err := r.acceptPodVolumeBackup(ctx, pvb); err != nil { @@ -278,7 +285,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ pvb.Name, pvb.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, pvb.Name, callbacks, false, log) if err != nil { if err == datapath.ConcurrentLimitExceed { - log.Info("Data path instance is concurrent limited requeue later") + log.Debug("Data path instance is concurrent limited requeue later") return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil } else { return r.errorOut(ctx, pvb, err, "error to create data path", log) @@ -304,6 +311,8 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseInProgress pvb.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} + delete(pvb.Labels, exposer.ExposeOnGoingLabel) + return true }); err != nil { log.WithError(err).Warnf("Failed to update PVB %s to InProgress, will data path close and retry", pvb.Name) @@ -370,6 +379,11 @@ func (r *PodVolumeBackupReconciler) acceptPodVolumeBackup(ctx context.Context, p pvb.Status.AcceptedTimestamp = &metav1.Time{Time: r.clock.Now()} pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseAccepted + if pvb.Labels == nil { + pvb.Labels = make(map[string]string) + } + pvb.Labels[exposer.ExposeOnGoingLabel] = "true" + return true }) } @@ -386,6 +400,8 @@ func (r *PodVolumeBackupReconciler) tryCancelPodVolumeBackup(ctx context.Context if message != "" { pvb.Status.Message = message } + + delete(pvb.Labels, exposer.ExposeOnGoingLabel) }) if err != nil { @@ -428,6 +444,8 @@ func (r *PodVolumeBackupReconciler) onPrepareTimeout(ctx context.Context, pvb *v succeeded, err := funcExclusiveUpdatePodVolumeBackup(ctx, r.client, pvb, func(pvb *velerov1api.PodVolumeBackup) { pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed pvb.Status.Message = "timeout on preparing PVB" + + delete(pvb.Labels, exposer.ExposeOnGoingLabel) }) if err != nil { @@ -508,6 +526,8 @@ func (r *PodVolumeBackupReconciler) OnDataPathCompleted(ctx context.Context, nam pvb.Status.Message = "volume was empty so no snapshot was taken" } + delete(pvb.Labels, exposer.ExposeOnGoingLabel) + return true }); err != nil { log.WithError(err).Error("error updating PVB status") @@ -565,6 +585,8 @@ func (r *PodVolumeBackupReconciler) OnDataPathCancelled(ctx context.Context, nam } pvb.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + delete(pvb.Labels, exposer.ExposeOnGoingLabel) + return true }); err != nil { log.WithError(err).Error("error updating PVB status on cancel") @@ -736,6 +758,8 @@ func UpdatePVBStatusToFailed(ctx context.Context, c client.Client, pvb *velerov1 pvb.Status.StartTimestamp = &metav1.Time{Time: time} } + delete(pvb.Labels, exposer.ExposeOnGoingLabel) + return true }); patchErr != nil { log.WithError(patchErr).Warn("error updating PVB status") diff --git a/pkg/controller/pod_volume_backup_controller_test.go b/pkg/controller/pod_volume_backup_controller_test.go index f05dcc29ae..f27a5a77ff 100644 --- a/pkg/controller/pod_volume_backup_controller_test.go +++ b/pkg/controller/pod_volume_backup_controller_test.go @@ -144,6 +144,7 @@ func initPVBReconcilerWithError(needError ...error) (*PodVolumeBackupReconciler, nil, fakeKubeClient, dataPathMgr, + nil, "test-node", time.Minute*5, time.Minute, @@ -224,6 +225,7 @@ func TestPVBReconcile(t *testing.T) { getExposeNil bool fsBRInitErr error fsBRStartErr error + constrained bool expectedErr string expectedResult *ctrl.Result expectDataPath bool @@ -317,6 +319,13 @@ func TestPVBReconcile(t *testing.T) { name: "Unknown pvb status", pvb: pvbBuilder().Phase("Unknown").Finalizers([]string{PodVolumeFinalizer}).Result(), }, + { + name: "new pvb but constrained", + pvb: pvbBuilder().Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(), + constrained: true, + expected: pvbBuilder().Finalizers([]string{PodVolumeFinalizer}).Result(), + expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + }, { name: "new pvb but accept failed", pvb: pvbBuilder().Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(), @@ -480,6 +489,10 @@ func TestPVBReconcile(t *testing.T) { r.cancelledPVB[test.pvb.Name] = test.sportTime.Time } + if test.constrained { + r.vgdpCounter = &exposer.VgdpCounter{} + } + if test.needMockExposer { r.exposer = &fakePvbExposer{r.client, r.clock, test.peekErr, test.exposeErr, test.getExposeErr, test.getExposeNil} } @@ -525,13 +538,13 @@ func TestPVBReconcile(t *testing.T) { assert.Equal(t, test.expectedResult.RequeueAfter, actualResult.RequeueAfter) } - if test.expected != nil || test.expectDeleted { - pvb := velerov1api.PodVolumeBackup{} - err = r.client.Get(ctx, client.ObjectKey{ - Name: test.pvb.Name, - Namespace: test.pvb.Namespace, - }, &pvb) + pvb := velerov1api.PodVolumeBackup{} + err = r.client.Get(ctx, client.ObjectKey{ + Name: test.pvb.Name, + Namespace: test.pvb.Namespace, + }, &pvb) + if test.expected != nil || test.expectDeleted { if test.expectDeleted { assert.True(t, apierrors.IsNotFound(err)) } else { @@ -555,6 +568,12 @@ func TestPVBReconcile(t *testing.T) { } else { assert.Empty(t, r.cancelledPVB) } + + if isPVBInFinalState(&pvb) || pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseInProgress { + assert.NotContains(t, pvb.Labels, exposer.ExposeOnGoingLabel) + } else if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseAccepted { + assert.Contains(t, pvb.Labels, exposer.ExposeOnGoingLabel) + } }) } } diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index 8894bcedaa..bf3d4da47b 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -55,7 +55,7 @@ import ( ) func NewPodVolumeRestoreReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, - nodeName string, preparingTimeout time.Duration, resourceTimeout time.Duration, podResources corev1api.ResourceRequirements, + counter *exposer.VgdpCounter, nodeName string, preparingTimeout time.Duration, resourceTimeout time.Duration, podResources corev1api.ResourceRequirements, logger logrus.FieldLogger) *PodVolumeRestoreReconciler { return &PodVolumeRestoreReconciler{ client: client, @@ -66,6 +66,7 @@ func NewPodVolumeRestoreReconciler(client client.Client, mgr manager.Manager, ku clock: &clocks.RealClock{}, podResources: podResources, dataPathMgr: dataPathMgr, + vgdpCounter: counter, preparingTimeout: preparingTimeout, resourceTimeout: resourceTimeout, exposer: exposer.NewPodVolumeExposer(kubeClient, logger), @@ -83,6 +84,7 @@ type PodVolumeRestoreReconciler struct { podResources corev1api.ResourceRequirements exposer exposer.PodVolumeExposer dataPathMgr *datapath.Manager + vgdpCounter *exposer.VgdpCounter preparingTimeout time.Duration resourceTimeout time.Duration cancelledPVR map[string]time.Time @@ -210,6 +212,11 @@ func (r *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req return ctrl.Result{}, nil } + if r.vgdpCounter != nil && r.vgdpCounter.IsConstrained(ctx, r.logger) { + log.Debug("Data path initiation is constrained, requeue later") + return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil + } + log.Info("Accepting PVR") if err := r.acceptPodVolumeRestore(ctx, pvr); err != nil { @@ -282,7 +289,7 @@ func (r *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req pvr.Name, pvr.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, pvr.Name, callbacks, false, log) if err != nil { if err == datapath.ConcurrentLimitExceed { - log.Info("Data path instance is concurrent limited requeue later") + log.Debug("Data path instance is concurrent limited requeue later") return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil } else { return r.errorOut(ctx, pvr, err, "error to create data path", log) @@ -306,6 +313,8 @@ func (r *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseInProgress pvr.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} + delete(pvr.Labels, exposer.ExposeOnGoingLabel) + return true }); err != nil { log.WithError(err).Warnf("Failed to update PVR %s to InProgress, will data path close and retry", pvr.Name) @@ -373,6 +382,11 @@ func (r *PodVolumeRestoreReconciler) acceptPodVolumeRestore(ctx context.Context, pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseAccepted pvr.Status.Node = r.nodeName + if pvr.Labels == nil { + pvr.Labels = make(map[string]string) + } + pvr.Labels[exposer.ExposeOnGoingLabel] = "true" + return true }) } @@ -389,6 +403,8 @@ func (r *PodVolumeRestoreReconciler) tryCancelPodVolumeRestore(ctx context.Conte if message != "" { pvr.Status.Message = message } + + delete(pvr.Labels, exposer.ExposeOnGoingLabel) }) if err != nil { @@ -433,6 +449,8 @@ func (r *PodVolumeRestoreReconciler) onPrepareTimeout(ctx context.Context, pvr * succeeded, err := funcExclusiveUpdatePodVolumeRestore(ctx, r.client, pvr, func(pvr *velerov1api.PodVolumeRestore) { pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed pvr.Status.Message = "timeout on preparing PVR" + + delete(pvr.Labels, exposer.ExposeOnGoingLabel) }) if err != nil { @@ -499,6 +517,8 @@ func UpdatePVRStatusToFailed(ctx context.Context, c client.Client, pvr *velerov1 pvr.Status.Message = errors.WithMessage(err, msg).Error() pvr.Status.CompletionTimestamp = &metav1.Time{Time: time} + delete(pvr.Labels, exposer.ExposeOnGoingLabel) + return true }); patchErr != nil { log.WithError(patchErr).Warn("error updating PVR status") @@ -749,6 +769,8 @@ func (r *PodVolumeRestoreReconciler) OnDataPathCompleted(ctx context.Context, na pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCompleted pvr.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + delete(pvr.Labels, exposer.ExposeOnGoingLabel) + return true }); err != nil { log.WithError(err).Error("error updating PVR status") @@ -798,6 +820,8 @@ func (r *PodVolumeRestoreReconciler) OnDataPathCancelled(ctx context.Context, na } pvr.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + delete(pvr.Labels, exposer.ExposeOnGoingLabel) + return true }); err != nil { log.WithError(err).Error("error updating PVR status on cancel") diff --git a/pkg/controller/pod_volume_restore_controller_test.go b/pkg/controller/pod_volume_restore_controller_test.go index 39effe10f9..af6637e6dd 100644 --- a/pkg/controller/pod_volume_restore_controller_test.go +++ b/pkg/controller/pod_volume_restore_controller_test.go @@ -617,7 +617,7 @@ func initPodVolumeRestoreReconcilerWithError(objects []runtime.Object, cliObj [] dataPathMgr := datapath.NewManager(1) - return NewPodVolumeRestoreReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, "test-node", time.Minute*5, time.Minute, corev1api.ResourceRequirements{}, velerotest.NewLogger()), nil + return NewPodVolumeRestoreReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, nil, "test-node", time.Minute*5, time.Minute, corev1api.ResourceRequirements{}, velerotest.NewLogger()), nil } func TestPodVolumeRestoreReconcile(t *testing.T) { @@ -669,6 +669,7 @@ func TestPodVolumeRestoreReconcile(t *testing.T) { mockCancel bool mockClose bool needExclusiveUpdateError error + constrained bool expected *velerov1api.PodVolumeRestore expectDeleted bool expectCancelRecord bool @@ -765,6 +766,14 @@ func TestPodVolumeRestoreReconcile(t *testing.T) { name: "Unknown pvr status", pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase("Unknown").Finalizers([]string{PodVolumeFinalizer}).Result(), }, + { + name: "new pvb but constrained", + pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).PodNamespace("test-ns").PodName("test-pod").Result(), + targetPod: builder.ForPod("test-ns", "test-pod").InitContainers(&corev1api.Container{Name: restorehelper.WaitInitContainer}).InitContainerState(corev1api.ContainerState{Running: &corev1api.ContainerStateRunning{}}).Result(), + constrained: true, + expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Result(), + expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + }, { name: "new pvr but accept failed", pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).PodNamespace("test-ns").PodName("test-pod").Result(), @@ -942,6 +951,10 @@ func TestPodVolumeRestoreReconcile(t *testing.T) { r.cancelledPVR[test.pvr.Name] = test.sportTime.Time } + if test.constrained { + r.vgdpCounter = &exposer.VgdpCounter{} + } + funcExclusiveUpdatePodVolumeRestore = exclusiveUpdatePodVolumeRestore if test.needExclusiveUpdateError != nil { funcExclusiveUpdatePodVolumeRestore = func(context.Context, kbclient.Client, *velerov1api.PodVolumeRestore, func(*velerov1api.PodVolumeRestore)) (bool, error) { @@ -1029,13 +1042,13 @@ func TestPodVolumeRestoreReconcile(t *testing.T) { assert.Equal(t, test.expectedResult.RequeueAfter, actualResult.RequeueAfter) } - if test.expected != nil || test.expectDeleted { - pvr := velerov1api.PodVolumeRestore{} - err = r.client.Get(ctx, kbclient.ObjectKey{ - Name: test.pvr.Name, - Namespace: test.pvr.Namespace, - }, &pvr) + pvr := velerov1api.PodVolumeRestore{} + err = r.client.Get(ctx, kbclient.ObjectKey{ + Name: test.pvr.Name, + Namespace: test.pvr.Namespace, + }, &pvr) + if test.expected != nil || test.expectDeleted { if test.expectDeleted { assert.True(t, apierrors.IsNotFound(err)) } else { @@ -1059,6 +1072,12 @@ func TestPodVolumeRestoreReconcile(t *testing.T) { } else { assert.Empty(t, r.cancelledPVR) } + + if isPVRInFinalState(&pvr) || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseInProgress { + assert.NotContains(t, pvr.Labels, exposer.ExposeOnGoingLabel) + } else if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseAccepted { + assert.Contains(t, pvr.Labels, exposer.ExposeOnGoingLabel) + } }) } } diff --git a/pkg/exposer/types.go b/pkg/exposer/types.go index 5cdb9d4971..7f38d7329c 100644 --- a/pkg/exposer/types.go +++ b/pkg/exposer/types.go @@ -26,6 +26,7 @@ const ( podGroupLabel = "velero.io/exposer-pod-group" podGroupSnapshot = "snapshot-exposer" podGroupGenericRestore = "generic-restore-exposer" + ExposeOnGoingLabel = "velero.io/expose-on-going" ) // ExposeResult defines the result of expose. diff --git a/pkg/exposer/vgdp_counter.go b/pkg/exposer/vgdp_counter.go new file mode 100644 index 0000000000..e8ab49b5bf --- /dev/null +++ b/pkg/exposer/vgdp_counter.go @@ -0,0 +1,223 @@ +package exposer + +import ( + "context" + "sync/atomic" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" + + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" + + "sigs.k8s.io/controller-runtime/pkg/manager" + + ctlclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +type dynamicQueueLength struct { + queueLength int + changeId uint64 +} + +type VgdpCounter struct { + client ctlclient.Client + allowedQueueLength int + + duState dynamicQueueLength + ddState dynamicQueueLength + pvbState dynamicQueueLength + pvrState dynamicQueueLength + + duCacheState dynamicQueueLength + ddCacheState dynamicQueueLength + pvbCacheState dynamicQueueLength + pvrCacheState dynamicQueueLength +} + +func StartVgdpCounter(ctx context.Context, mgr manager.Manager, queueLength int) (*VgdpCounter, error) { + counter := &VgdpCounter{ + client: mgr.GetClient(), + allowedQueueLength: queueLength, + } + + atomic.StoreUint64(&counter.duState.changeId, 1) + atomic.StoreUint64(&counter.ddState.changeId, 1) + atomic.StoreUint64(&counter.pvbState.changeId, 1) + atomic.StoreUint64(&counter.pvrState.changeId, 1) + + if err := counter.initListeners(ctx, mgr); err != nil { + return nil, err + } + + return counter, nil +} + +func (w *VgdpCounter) initListeners(ctx context.Context, mgr manager.Manager) error { + duInformer, err := mgr.GetCache().GetInformer(ctx, &velerov2alpha1api.DataUpload{}) + if err != nil { + return errors.Wrap(err, "error getting du informer") + } + + if _, err := duInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj any) { + oldDu := oldObj.(*velerov2alpha1api.DataUpload) + newDu := newObj.(*velerov2alpha1api.DataUpload) + + if oldDu.Status.Phase == newDu.Status.Phase { + return + } + + if newDu.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted || + oldDu.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared || + oldDu.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted && newDu.Status.Phase != velerov2alpha1api.DataUploadPhasePrepared { + atomic.AddUint64(&w.duState.changeId, 1) + } + }, + }, + ); err != nil { + return errors.Wrap(err, "error registering du handler") + } + + ddInformer, err := mgr.GetCache().GetInformer(ctx, &velerov2alpha1api.DataDownload{}) + if err != nil { + return errors.Wrap(err, "error getting dd informer") + } + + if _, err := ddInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj any) { + oldDd := oldObj.(*velerov2alpha1api.DataDownload) + newDd := newObj.(*velerov2alpha1api.DataDownload) + + if oldDd.Status.Phase == newDd.Status.Phase { + return + } + + if newDd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted || + oldDd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared || + oldDd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted && newDd.Status.Phase != velerov2alpha1api.DataDownloadPhasePrepared { + atomic.AddUint64(&w.ddState.changeId, 1) + } + }, + }, + ); err != nil { + return errors.Wrap(err, "error registering dd handler") + } + + pvbInformer, err := mgr.GetCache().GetInformer(ctx, &velerov1api.PodVolumeBackup{}) + if err != nil { + return errors.Wrap(err, "error getting PVB informer") + } + + if _, err := pvbInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj any) { + oldPvb := oldObj.(*velerov1api.PodVolumeBackup) + newPvb := newObj.(*velerov1api.PodVolumeBackup) + + if oldPvb.Status.Phase == newPvb.Status.Phase { + return + } + + if newPvb.Status.Phase == velerov1api.PodVolumeBackupPhaseAccepted || + oldPvb.Status.Phase == velerov1api.PodVolumeBackupPhasePrepared || + oldPvb.Status.Phase == velerov1api.PodVolumeBackupPhaseAccepted && newPvb.Status.Phase != velerov1api.PodVolumeBackupPhasePrepared { + atomic.AddUint64(&w.pvbState.changeId, 1) + } + }, + }, + ); err != nil { + return errors.Wrap(err, "error registering PVB handler") + } + + pvrInformer, err := mgr.GetCache().GetInformer(ctx, &velerov1api.PodVolumeRestore{}) + if err != nil { + return errors.Wrap(err, "error getting PVR informer") + } + + if _, err := pvrInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj any) { + oldPvr := oldObj.(*velerov1api.PodVolumeRestore) + newPvr := newObj.(*velerov1api.PodVolumeRestore) + + if oldPvr.Status.Phase == newPvr.Status.Phase { + return + } + + if newPvr.Status.Phase == velerov1api.PodVolumeRestorePhaseAccepted || + oldPvr.Status.Phase == velerov1api.PodVolumeRestorePhasePrepared || + oldPvr.Status.Phase == velerov1api.PodVolumeRestorePhaseAccepted && newPvr.Status.Phase != velerov1api.PodVolumeRestorePhasePrepared { + atomic.AddUint64(&w.pvrState.changeId, 1) + } + }, + }, + ); err != nil { + return errors.Wrap(err, "error registering PVR handler") + } + + return nil +} + +func (w *VgdpCounter) IsConstrained(ctx context.Context, log logrus.FieldLogger) bool { + id := atomic.LoadUint64(&w.duState.changeId) + if id != w.duCacheState.changeId { + duList := &velerov2alpha1api.DataUploadList{} + if err := w.client.List(ctx, duList, &ctlclient.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{ExposeOnGoingLabel: "true"}))}); err != nil { + log.WithError(err).Warn("Failed to list data uploads, skip counting") + } else { + w.duCacheState.queueLength = len(duList.Items) + w.duCacheState.changeId = id + + log.Infof("Query queue length for du %d", w.duCacheState.queueLength) + } + } + + id = atomic.LoadUint64(&w.ddState.changeId) + if id != w.ddCacheState.changeId { + ddList := &velerov2alpha1api.DataDownloadList{} + if err := w.client.List(ctx, ddList, &ctlclient.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{ExposeOnGoingLabel: "true"}))}); err != nil { + log.WithError(err).Warn("Failed to list data downloads, skip counting") + } else { + w.ddCacheState.queueLength = len(ddList.Items) + w.ddCacheState.changeId = id + + log.Infof("Query queue length for dd %d", w.ddCacheState.queueLength) + } + } + + id = atomic.LoadUint64(&w.pvbState.changeId) + if id != w.pvbCacheState.changeId { + pvbList := &velerov1api.PodVolumeBackupList{} + if err := w.client.List(ctx, pvbList, &ctlclient.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{ExposeOnGoingLabel: "true"}))}); err != nil { + log.WithError(err).Warn("Failed to list PVB, skip counting") + } else { + w.pvbCacheState.queueLength = len(pvbList.Items) + w.pvbCacheState.changeId = id + + log.Infof("Query queue length for pvb %d", w.pvbCacheState.queueLength) + } + } + + id = atomic.LoadUint64(&w.pvrState.changeId) + if id != w.pvrCacheState.changeId { + pvrList := &velerov1api.PodVolumeRestoreList{} + if err := w.client.List(ctx, pvrList, &ctlclient.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{ExposeOnGoingLabel: "true"}))}); err != nil { + log.WithError(err).Warn("Failed to list PVR, skip counting") + } else { + w.pvrCacheState.queueLength = len(pvrList.Items) + w.pvrCacheState.changeId = id + + log.Infof("Query queue length for pvr %d", w.pvrCacheState.queueLength) + } + } + + existing := w.duCacheState.queueLength + w.ddCacheState.queueLength + w.pvbCacheState.queueLength + w.pvrCacheState.queueLength + constrained := existing >= w.allowedQueueLength + + return constrained +} diff --git a/pkg/exposer/vgdp_counter_test.go b/pkg/exposer/vgdp_counter_test.go new file mode 100644 index 0000000000..5b654d94fb --- /dev/null +++ b/pkg/exposer/vgdp_counter_test.go @@ -0,0 +1,181 @@ +package exposer + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/vmware-tanzu/velero/pkg/builder" + velerotest "github.com/vmware-tanzu/velero/pkg/test" + + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" +) + +func TestIsConstrained(t *testing.T) { + tests := []struct { + name string + counter VgdpCounter + kubeClientObj []client.Object + getErr bool + expected bool + }{ + { + name: "no change, constrained", + counter: VgdpCounter{}, + expected: true, + }, + { + name: "no change, not constrained", + counter: VgdpCounter{allowedQueueLength: 1}, + }, + { + name: "change in du, get failed", + counter: VgdpCounter{ + allowedQueueLength: 1, + duState: dynamicQueueLength{0, 1}, + }, + getErr: true, + }, + { + name: "change in du, constrained", + counter: VgdpCounter{ + allowedQueueLength: 1, + duState: dynamicQueueLength{0, 1}, + }, + kubeClientObj: []client.Object{ + builder.ForDataUpload("velero", "test-1").Labels(map[string]string{ExposeOnGoingLabel: "true"}).Result(), + }, + expected: true, + }, + { + name: "change in dd, get failed", + counter: VgdpCounter{ + allowedQueueLength: 1, + ddState: dynamicQueueLength{0, 1}, + }, + getErr: true, + }, + { + name: "change in dd, constrained", + counter: VgdpCounter{ + allowedQueueLength: 1, + ddState: dynamicQueueLength{0, 1}, + }, + kubeClientObj: []client.Object{ + builder.ForDataDownload("velero", "test-1").Labels(map[string]string{ExposeOnGoingLabel: "true"}).Result(), + }, + expected: true, + }, + { + name: "change in pvb, get failed", + counter: VgdpCounter{ + allowedQueueLength: 1, + pvbState: dynamicQueueLength{0, 1}, + }, + getErr: true, + }, + { + name: "change in pvb, constrained", + counter: VgdpCounter{ + allowedQueueLength: 1, + pvbState: dynamicQueueLength{0, 1}, + }, + kubeClientObj: []client.Object{ + builder.ForPodVolumeBackup("velero", "test-1").Labels(map[string]string{ExposeOnGoingLabel: "true"}).Result(), + }, + expected: true, + }, + { + name: "change in pvr, get failed", + counter: VgdpCounter{ + allowedQueueLength: 1, + pvrState: dynamicQueueLength{0, 1}, + }, + getErr: true, + }, + { + name: "change in pvr, constrained", + counter: VgdpCounter{ + allowedQueueLength: 1, + pvrState: dynamicQueueLength{0, 1}, + }, + kubeClientObj: []client.Object{ + builder.ForPodVolumeRestore("velero", "test-1").Labels(map[string]string{ExposeOnGoingLabel: "true"}).Result(), + }, + expected: true, + }, + { + name: "change in du, pvb, not constrained", + counter: VgdpCounter{ + allowedQueueLength: 3, + duState: dynamicQueueLength{0, 1}, + pvbState: dynamicQueueLength{0, 1}, + }, + kubeClientObj: []client.Object{ + builder.ForDataUpload("velero", "test-1").Labels(map[string]string{ExposeOnGoingLabel: "true"}).Result(), + builder.ForPodVolumeBackup("velero", "test-1").Labels(map[string]string{ExposeOnGoingLabel: "true"}).Result(), + }, + }, + { + name: "change in dd, pvr, constrained", + counter: VgdpCounter{ + allowedQueueLength: 1, + ddState: dynamicQueueLength{0, 1}, + pvrState: dynamicQueueLength{0, 1}, + }, + kubeClientObj: []client.Object{ + builder.ForDataDownload("velero", "test-1").Labels(map[string]string{ExposeOnGoingLabel: "true"}).Result(), + builder.ForPodVolumeRestore("velero", "test-1").Labels(map[string]string{ExposeOnGoingLabel: "true"}).Result(), + }, + expected: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + scheme := runtime.NewScheme() + + if !test.getErr { + err := velerov1api.AddToScheme(scheme) + require.NoError(t, err) + + err = velerov2alpha1api.AddToScheme(scheme) + require.NoError(t, err) + } + + test.counter.client = fake.NewClientBuilder().WithScheme(scheme).WithObjects(test.kubeClientObj...).Build() + + result := test.counter.IsConstrained(context.TODO(), velerotest.NewLogger()) + + assert.Equal(t, test.expected, result) + + if !test.getErr { + assert.Equal(t, test.counter.duState.changeId, test.counter.duCacheState.changeId) + assert.Equal(t, test.counter.ddState.changeId, test.counter.ddCacheState.changeId) + assert.Equal(t, test.counter.pvbState.changeId, test.counter.pvbCacheState.changeId) + assert.Equal(t, test.counter.pvrState.changeId, test.counter.pvrCacheState.changeId) + } else { + or := test.counter.duState.changeId != test.counter.duCacheState.changeId + if !or { + or = test.counter.ddState.changeId != test.counter.ddCacheState.changeId + } + + if !or { + or = test.counter.pvbState.changeId != test.counter.pvbCacheState.changeId + } + + if !or { + or = test.counter.pvrState.changeId != test.counter.pvrCacheState.changeId + } + + assert.True(t, or) + } + }) + } +} diff --git a/pkg/nodeagent/node_agent.go b/pkg/nodeagent/node_agent.go index 3d11590854..cddf1813a5 100644 --- a/pkg/nodeagent/node_agent.go +++ b/pkg/nodeagent/node_agent.go @@ -61,6 +61,9 @@ type LoadConcurrency struct { // PerNodeConfig specifies the concurrency number to nodes matched by rules PerNodeConfig []RuledConfigs `json:"perNodeConfig,omitempty"` + + // WaitQueueLength specifies the max number of loads that are waiting for process + WaitQueueLength int `json:"waitQueueLength,omitempty"` } type LoadAffinity struct { From ecdb330fa3a7af75266981bdf2f4c9a58fd788f2 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Fri, 4 Jul 2025 17:49:57 +0800 Subject: [PATCH 2/2] issue 8344: constrain data path expose Signed-off-by: Lyndon-Li --- pkg/cmd/cli/nodeagent/server.go | 6 ++--- pkg/exposer/vgdp_counter.go | 42 ++++++++++++++++---------------- pkg/exposer/vgdp_counter_test.go | 16 ++++++------ pkg/nodeagent/node_agent.go | 4 +-- 4 files changed, 34 insertions(+), 34 deletions(-) diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 76a04752d3..ce4ad20902 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -302,12 +302,12 @@ func (s *nodeAgentServer) run() { } } - if s.dataPathConfigs != nil && s.dataPathConfigs.LoadConcurrency != nil && s.dataPathConfigs.LoadConcurrency.WaitQueueLength > 0 { - if counter, err := exposer.StartVgdpCounter(s.ctx, s.mgr, s.dataPathConfigs.LoadConcurrency.WaitQueueLength); err != nil { + if s.dataPathConfigs != nil && s.dataPathConfigs.LoadConcurrency != nil && s.dataPathConfigs.LoadConcurrency.PrepareQueueLength > 0 { + if counter, err := exposer.StartVgdpCounter(s.ctx, s.mgr, s.dataPathConfigs.LoadConcurrency.PrepareQueueLength); err != nil { s.logger.WithError(err).Warnf("Failed to start VGDP counter, VDGP loads are not constrained") } else { s.vgdpCounter = counter - s.logger.Infof("VGDP loads are constrained with %d", s.dataPathConfigs.LoadConcurrency.WaitQueueLength) + s.logger.Infof("VGDP loads are constrained with %d", s.dataPathConfigs.LoadConcurrency.PrepareQueueLength) } } diff --git a/pkg/exposer/vgdp_counter.go b/pkg/exposer/vgdp_counter.go index e8ab49b5bf..cf6737c147 100644 --- a/pkg/exposer/vgdp_counter.go +++ b/pkg/exposer/vgdp_counter.go @@ -19,7 +19,7 @@ import ( type dynamicQueueLength struct { queueLength int - changeId uint64 + changeID uint64 } type VgdpCounter struct { @@ -43,10 +43,10 @@ func StartVgdpCounter(ctx context.Context, mgr manager.Manager, queueLength int) allowedQueueLength: queueLength, } - atomic.StoreUint64(&counter.duState.changeId, 1) - atomic.StoreUint64(&counter.ddState.changeId, 1) - atomic.StoreUint64(&counter.pvbState.changeId, 1) - atomic.StoreUint64(&counter.pvrState.changeId, 1) + atomic.StoreUint64(&counter.duState.changeID, 1) + atomic.StoreUint64(&counter.ddState.changeID, 1) + atomic.StoreUint64(&counter.pvbState.changeID, 1) + atomic.StoreUint64(&counter.pvrState.changeID, 1) if err := counter.initListeners(ctx, mgr); err != nil { return nil, err @@ -74,7 +74,7 @@ func (w *VgdpCounter) initListeners(ctx context.Context, mgr manager.Manager) er if newDu.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted || oldDu.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared || oldDu.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted && newDu.Status.Phase != velerov2alpha1api.DataUploadPhasePrepared { - atomic.AddUint64(&w.duState.changeId, 1) + atomic.AddUint64(&w.duState.changeID, 1) } }, }, @@ -100,7 +100,7 @@ func (w *VgdpCounter) initListeners(ctx context.Context, mgr manager.Manager) er if newDd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted || oldDd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared || oldDd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted && newDd.Status.Phase != velerov2alpha1api.DataDownloadPhasePrepared { - atomic.AddUint64(&w.ddState.changeId, 1) + atomic.AddUint64(&w.ddState.changeID, 1) } }, }, @@ -126,7 +126,7 @@ func (w *VgdpCounter) initListeners(ctx context.Context, mgr manager.Manager) er if newPvb.Status.Phase == velerov1api.PodVolumeBackupPhaseAccepted || oldPvb.Status.Phase == velerov1api.PodVolumeBackupPhasePrepared || oldPvb.Status.Phase == velerov1api.PodVolumeBackupPhaseAccepted && newPvb.Status.Phase != velerov1api.PodVolumeBackupPhasePrepared { - atomic.AddUint64(&w.pvbState.changeId, 1) + atomic.AddUint64(&w.pvbState.changeID, 1) } }, }, @@ -152,7 +152,7 @@ func (w *VgdpCounter) initListeners(ctx context.Context, mgr manager.Manager) er if newPvr.Status.Phase == velerov1api.PodVolumeRestorePhaseAccepted || oldPvr.Status.Phase == velerov1api.PodVolumeRestorePhasePrepared || oldPvr.Status.Phase == velerov1api.PodVolumeRestorePhaseAccepted && newPvr.Status.Phase != velerov1api.PodVolumeRestorePhasePrepared { - atomic.AddUint64(&w.pvrState.changeId, 1) + atomic.AddUint64(&w.pvrState.changeID, 1) } }, }, @@ -164,53 +164,53 @@ func (w *VgdpCounter) initListeners(ctx context.Context, mgr manager.Manager) er } func (w *VgdpCounter) IsConstrained(ctx context.Context, log logrus.FieldLogger) bool { - id := atomic.LoadUint64(&w.duState.changeId) - if id != w.duCacheState.changeId { + id := atomic.LoadUint64(&w.duState.changeID) + if id != w.duCacheState.changeID { duList := &velerov2alpha1api.DataUploadList{} if err := w.client.List(ctx, duList, &ctlclient.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{ExposeOnGoingLabel: "true"}))}); err != nil { log.WithError(err).Warn("Failed to list data uploads, skip counting") } else { w.duCacheState.queueLength = len(duList.Items) - w.duCacheState.changeId = id + w.duCacheState.changeID = id log.Infof("Query queue length for du %d", w.duCacheState.queueLength) } } - id = atomic.LoadUint64(&w.ddState.changeId) - if id != w.ddCacheState.changeId { + id = atomic.LoadUint64(&w.ddState.changeID) + if id != w.ddCacheState.changeID { ddList := &velerov2alpha1api.DataDownloadList{} if err := w.client.List(ctx, ddList, &ctlclient.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{ExposeOnGoingLabel: "true"}))}); err != nil { log.WithError(err).Warn("Failed to list data downloads, skip counting") } else { w.ddCacheState.queueLength = len(ddList.Items) - w.ddCacheState.changeId = id + w.ddCacheState.changeID = id log.Infof("Query queue length for dd %d", w.ddCacheState.queueLength) } } - id = atomic.LoadUint64(&w.pvbState.changeId) - if id != w.pvbCacheState.changeId { + id = atomic.LoadUint64(&w.pvbState.changeID) + if id != w.pvbCacheState.changeID { pvbList := &velerov1api.PodVolumeBackupList{} if err := w.client.List(ctx, pvbList, &ctlclient.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{ExposeOnGoingLabel: "true"}))}); err != nil { log.WithError(err).Warn("Failed to list PVB, skip counting") } else { w.pvbCacheState.queueLength = len(pvbList.Items) - w.pvbCacheState.changeId = id + w.pvbCacheState.changeID = id log.Infof("Query queue length for pvb %d", w.pvbCacheState.queueLength) } } - id = atomic.LoadUint64(&w.pvrState.changeId) - if id != w.pvrCacheState.changeId { + id = atomic.LoadUint64(&w.pvrState.changeID) + if id != w.pvrCacheState.changeID { pvrList := &velerov1api.PodVolumeRestoreList{} if err := w.client.List(ctx, pvrList, &ctlclient.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{ExposeOnGoingLabel: "true"}))}); err != nil { log.WithError(err).Warn("Failed to list PVR, skip counting") } else { w.pvrCacheState.queueLength = len(pvrList.Items) - w.pvrCacheState.changeId = id + w.pvrCacheState.changeID = id log.Infof("Query queue length for pvr %d", w.pvrCacheState.queueLength) } diff --git a/pkg/exposer/vgdp_counter_test.go b/pkg/exposer/vgdp_counter_test.go index 5b654d94fb..68a57537b9 100644 --- a/pkg/exposer/vgdp_counter_test.go +++ b/pkg/exposer/vgdp_counter_test.go @@ -156,22 +156,22 @@ func TestIsConstrained(t *testing.T) { assert.Equal(t, test.expected, result) if !test.getErr { - assert.Equal(t, test.counter.duState.changeId, test.counter.duCacheState.changeId) - assert.Equal(t, test.counter.ddState.changeId, test.counter.ddCacheState.changeId) - assert.Equal(t, test.counter.pvbState.changeId, test.counter.pvbCacheState.changeId) - assert.Equal(t, test.counter.pvrState.changeId, test.counter.pvrCacheState.changeId) + assert.Equal(t, test.counter.duState.changeID, test.counter.duCacheState.changeID) + assert.Equal(t, test.counter.ddState.changeID, test.counter.ddCacheState.changeID) + assert.Equal(t, test.counter.pvbState.changeID, test.counter.pvbCacheState.changeID) + assert.Equal(t, test.counter.pvrState.changeID, test.counter.pvrCacheState.changeID) } else { - or := test.counter.duState.changeId != test.counter.duCacheState.changeId + or := test.counter.duState.changeID != test.counter.duCacheState.changeID if !or { - or = test.counter.ddState.changeId != test.counter.ddCacheState.changeId + or = test.counter.ddState.changeID != test.counter.ddCacheState.changeID } if !or { - or = test.counter.pvbState.changeId != test.counter.pvbCacheState.changeId + or = test.counter.pvbState.changeID != test.counter.pvbCacheState.changeID } if !or { - or = test.counter.pvrState.changeId != test.counter.pvrCacheState.changeId + or = test.counter.pvrState.changeID != test.counter.pvrCacheState.changeID } assert.True(t, or) diff --git a/pkg/nodeagent/node_agent.go b/pkg/nodeagent/node_agent.go index cddf1813a5..03006aeb66 100644 --- a/pkg/nodeagent/node_agent.go +++ b/pkg/nodeagent/node_agent.go @@ -62,8 +62,8 @@ type LoadConcurrency struct { // PerNodeConfig specifies the concurrency number to nodes matched by rules PerNodeConfig []RuledConfigs `json:"perNodeConfig,omitempty"` - // WaitQueueLength specifies the max number of loads that are waiting for process - WaitQueueLength int `json:"waitQueueLength,omitempty"` + // PrepareQueueLength specifies the max number of loads that are under expose + PrepareQueueLength int `json:"prepareQueueLength,omitempty"` } type LoadAffinity struct {