Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/9064-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix #8344, add a mechanism to soothe creation of data mover pods for DataUpload, DataDownload, PodVolumeBackup and PodVolumeRestore
6 changes: 6 additions & 0 deletions pkg/builder/pod_volume_backup_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,9 @@ func (b *PodVolumeBackupBuilder) OwnerReference(ref metav1.OwnerReference) *PodV
b.object.OwnerReferences = append(b.object.OwnerReferences, ref)
return b
}

// Labels sets the PodVolumeBackup's Labels.
func (b *PodVolumeBackupBuilder) Labels(label map[string]string) *PodVolumeBackupBuilder {
b.object.Labels = label
return b
}
6 changes: 6 additions & 0 deletions pkg/builder/pod_volume_restore_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,9 @@ func (b *PodVolumeRestoreBuilder) Node(node string) *PodVolumeRestoreBuilder {
b.object.Status.Node = node
return b
}

// Labels sets the PodVolumeRestoreBuilder's Labels.
func (b *PodVolumeRestoreBuilder) Labels(label map[string]string) *PodVolumeRestoreBuilder {
b.object.Labels = label
return b
}
17 changes: 15 additions & 2 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"github.com/vmware-tanzu/velero/pkg/constant"
"github.com/vmware-tanzu/velero/pkg/controller"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
Expand Down Expand Up @@ -140,6 +141,7 @@
csiSnapshotClient *snapshotv1client.Clientset
dataPathMgr *datapath.Manager
dataPathConfigs *nodeagent.Configs
vgdpCounter *exposer.VgdpCounter
}

func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, config nodeAgentServerConfig) (*nodeAgentServer, error) {
Expand Down Expand Up @@ -300,12 +302,21 @@
}
}

pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.metrics, s.logger)
if s.dataPathConfigs != nil && s.dataPathConfigs.LoadConcurrency != nil && s.dataPathConfigs.LoadConcurrency.PrepareQueueLength > 0 {
if counter, err := exposer.StartVgdpCounter(s.ctx, s.mgr, s.dataPathConfigs.LoadConcurrency.PrepareQueueLength); err != nil {
s.logger.WithError(err).Warnf("Failed to start VGDP counter, VDGP loads are not constrained")
} else {
s.vgdpCounter = counter
s.logger.Infof("VGDP loads are constrained with %d", s.dataPathConfigs.LoadConcurrency.PrepareQueueLength)
}

Check warning on line 311 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L305-L311

Added lines #L305 - L311 were not covered by tests
}

pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.vgdpCounter, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.metrics, s.logger)

Check warning on line 314 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L314

Added line #L314 was not covered by tests
if err := pvbReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerPodVolumeBackup)
}

pvrReconciler := controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.logger)
pvrReconciler := controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.vgdpCounter, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.logger)

Check warning on line 319 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L319

Added line #L319 was not covered by tests
if err := pvrReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}
Expand All @@ -320,6 +331,7 @@
s.kubeClient,
s.csiSnapshotClient.SnapshotV1(),
s.dataPathMgr,
s.vgdpCounter,

Check warning on line 334 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L334

Added line #L334 was not covered by tests
loadAffinity,
backupPVCConfig,
podResources,
Expand All @@ -344,6 +356,7 @@
s.mgr,
s.kubeClient,
s.dataPathMgr,
s.vgdpCounter,

Check warning on line 359 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L359

Added line #L359 was not covered by tests
loadAffinity,
restorePVCConfig,
podResources,
Expand Down
43 changes: 35 additions & 8 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type DataDownloadReconciler struct {
restoreExposer exposer.GenericRestoreExposer
nodeName string
dataPathMgr *datapath.Manager
vgdpCounter *exposer.VgdpCounter
loadAffinity []*kube.LoadAffinity
restorePVCConfig nodeagent.RestorePVC
podResources corev1api.ResourceRequirements
Expand All @@ -77,6 +78,7 @@ func NewDataDownloadReconciler(
mgr manager.Manager,
kubeClient kubernetes.Interface,
dataPathMgr *datapath.Manager,
counter *exposer.VgdpCounter,
loadAffinity []*kube.LoadAffinity,
restorePVCConfig nodeagent.RestorePVC,
podResources corev1api.ResourceRequirements,
Expand All @@ -95,6 +97,7 @@ func NewDataDownloadReconciler(
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
restorePVCConfig: restorePVCConfig,
dataPathMgr: dataPathMgr,
vgdpCounter: counter,
loadAffinity: loadAffinity,
podResources: podResources,
preparingTimeout: preparingTimeout,
Expand Down Expand Up @@ -220,13 +223,26 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
}

if dd.Status.Phase == "" || dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseNew {
log.Info("Data download starting")
if dd.Spec.Cancel {
log.Debugf("Data download is canceled in Phase %s", dd.Status.Phase)

r.tryCancelDataDownload(ctx, dd, "")

return ctrl.Result{}, nil
}

if r.vgdpCounter != nil && r.vgdpCounter.IsConstrained(ctx, r.logger) {
log.Debug("Data path initiation is constrained, requeue later")
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
}

if _, err := r.getTargetPVC(ctx, dd); err != nil {
log.WithField("error", err).Debugf("Cannot find target PVC for DataDownload yet. Retry later.")
return ctrl.Result{Requeue: true}, nil
}

log.Info("Data download starting")

accepted, err := r.acceptDataDownload(ctx, dd)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "error accepting the data download %s", dd.Name)
Expand All @@ -239,12 +255,6 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request

log.Info("Data download is accepted")

if dd.Spec.Cancel {
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
return ctrl.Result{}, nil
}

exposeParam, err := r.setupExposeParam(dd)
if err != nil {
return r.errorOut(ctx, dd, err, "failed to set exposer parameters", log)
Expand Down Expand Up @@ -312,7 +322,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
dd.Name, dd.Namespace, result.ByPod.HostingPod.Name, result.ByPod.HostingContainer, dd.Name, callbacks, false, log)
if err != nil {
if err == datapath.ConcurrentLimitExceed {
log.Info("Data path instance is concurrent limited requeue later")
log.Debug("Data path instance is concurrent limited requeue later")
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
} else {
return r.errorOut(ctx, dd, err, "error to create data path", log)
Expand All @@ -337,6 +347,8 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}

delete(dd.Labels, exposer.ExposeOnGoingLabel)

return true
}); err != nil {
log.WithError(err).Warnf("Failed to update datadownload %s to InProgress, will data path close and retry", dd.Name)
Expand Down Expand Up @@ -454,6 +466,8 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCompleted
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}

delete(dd.Labels, exposer.ExposeOnGoingLabel)

return true
}); err != nil {
log.WithError(err).Error("error updating data download status")
Expand Down Expand Up @@ -504,6 +518,8 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na
}
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}

delete(dd.Labels, exposer.ExposeOnGoingLabel)

return true
}); err != nil {
log.WithError(err).Error("error updating data download status")
Expand All @@ -525,6 +541,8 @@ func (r *DataDownloadReconciler) tryCancelDataDownload(ctx context.Context, dd *
if message != "" {
dataDownload.Status.Message = message
}

delete(dataDownload.Labels, exposer.ExposeOnGoingLabel)
})

if err != nil {
Expand Down Expand Up @@ -702,6 +720,8 @@ func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *v
dd.Status.Message = errors.WithMessage(err, msg).Error()
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}

delete(dd.Labels, exposer.ExposeOnGoingLabel)

return true
}); patchErr != nil {
log.WithError(patchErr).Error("error updating DataDownload status")
Expand All @@ -724,6 +744,11 @@ func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *vel
datadownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted
datadownload.Status.AcceptedByNode = r.nodeName
datadownload.Status.AcceptedTimestamp = &metav1.Time{Time: r.Clock.Now()}

if datadownload.Labels == nil {
datadownload.Labels = make(map[string]string)
}
datadownload.Labels[exposer.ExposeOnGoingLabel] = "true"
}

succeeded, err := funcExclusiveUpdateDataDownload(ctx, r.client, updated, updateFunc)
Expand All @@ -749,6 +774,8 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler
succeeded, err := funcExclusiveUpdateDataDownload(ctx, r.client, dd, func(dd *velerov2alpha1api.DataDownload) {
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed
dd.Status.Message = "timeout on preparing data download"

delete(dd.Labels, exposer.ExposeOnGoingLabel)
})

if err != nil {
Expand Down
44 changes: 32 additions & 12 deletions pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func initDataDownloadReconcilerWithError(t *testing.T, objects []any, needError
fakeKubeClient,
dataPathMgr,
nil,
nil,
nodeagent.RestorePVC{},
corev1api.ResourceRequirements{},
"test-node",
Expand Down Expand Up @@ -195,6 +196,7 @@ func TestDataDownloadReconcile(t *testing.T) {
mockCancel bool
mockClose bool
needExclusiveUpdateError error
constrained bool
expected *velerov2alpha1api.DataDownload
expectDeleted bool
expectCancelRecord bool
Expand Down Expand Up @@ -295,6 +297,20 @@ func TestDataDownloadReconcile(t *testing.T) {
name: "Unknown data download status",
dd: dataDownloadBuilder().Phase("Unknown").Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
},
{
name: "dd is cancel on new",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
expectCancelRecord: true,
expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(),
},
{
name: "new dd but constrained",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
constrained: true,
expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
},
{
name: "new dd but no target PVC",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
Expand All @@ -308,12 +324,6 @@ func TestDataDownloadReconcile(t *testing.T) {
expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
expectedErr: "error accepting the data download datadownload-1: exclusive-update-error",
},
{
name: "dd is cancel on accepted",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(),
},
{
name: "dd is accepted but setup expose param failed",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).NodeOS("xxx").Result(),
Expand Down Expand Up @@ -488,6 +498,10 @@ func TestDataDownloadReconcile(t *testing.T) {
r.cancelledDataDownload[test.dd.Name] = test.sportTime.Time
}

if test.constrained {
r.vgdpCounter = &exposer.VgdpCounter{}
}

funcExclusiveUpdateDataDownload = exclusiveUpdateDataDownload
if test.needExclusiveUpdateError != nil {
funcExclusiveUpdateDataDownload = func(context.Context, kbclient.Client, *velerov2alpha1api.DataDownload, func(*velerov2alpha1api.DataDownload)) (bool, error) {
Expand Down Expand Up @@ -571,13 +585,13 @@ func TestDataDownloadReconcile(t *testing.T) {
assert.Equal(t, test.expectedResult.RequeueAfter, actualResult.RequeueAfter)
}

if test.expected != nil || test.expectDeleted {
dd := velerov2alpha1api.DataDownload{}
err = r.client.Get(ctx, kbclient.ObjectKey{
Name: test.dd.Name,
Namespace: test.dd.Namespace,
}, &dd)
dd := velerov2alpha1api.DataDownload{}
err = r.client.Get(ctx, kbclient.ObjectKey{
Name: test.dd.Name,
Namespace: test.dd.Namespace,
}, &dd)

if test.expected != nil || test.expectDeleted {
if test.expectDeleted {
assert.True(t, apierrors.IsNotFound(err))
} else {
Expand All @@ -601,6 +615,12 @@ func TestDataDownloadReconcile(t *testing.T) {
} else {
assert.Empty(t, r.cancelledDataDownload)
}

if isDataDownloadInFinalState(&dd) || dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
assert.NotContains(t, dd.Labels, exposer.ExposeOnGoingLabel)
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
assert.Contains(t, dd.Labels, exposer.ExposeOnGoingLabel)
}
})
}
}
Expand Down
Loading
Loading