Skip to content

Commit 8427a9f

Browse files
authored
Merge pull request #6308 from Lyndon-Li/data-mover-backup-expose-02
Data mover backup expose
2 parents 80db04e + 25624d3 commit 8427a9f

File tree

10 files changed

+1538
-0
lines changed

10 files changed

+1538
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add the code for data mover backup expose

pkg/exposer/csi_snapshot.go

Lines changed: 385 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,385 @@
1+
/*
2+
Copyright The Velero Contributors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package exposer
18+
19+
import (
20+
"context"
21+
"time"
22+
23+
"github.com/pkg/errors"
24+
"github.com/sirupsen/logrus"
25+
26+
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
27+
28+
corev1 "k8s.io/api/core/v1"
29+
"k8s.io/apimachinery/pkg/api/resource"
30+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31+
"k8s.io/apimachinery/pkg/types"
32+
"k8s.io/client-go/kubernetes"
33+
34+
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
35+
36+
"github.com/vmware-tanzu/velero/pkg/util/csi"
37+
"github.com/vmware-tanzu/velero/pkg/util/kube"
38+
39+
snapshotter "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/typed/volumesnapshot/v1"
40+
"sigs.k8s.io/controller-runtime/pkg/client"
41+
42+
apierrors "k8s.io/apimachinery/pkg/api/errors"
43+
)
44+
45+
// CSISnapshotExposeParam define the input param for Expose of CSI snapshots
46+
type CSISnapshotExposeParam struct {
47+
// SourceNamespace is the original namespace of the volume that the snapshot is taken for
48+
SourceNamespace string
49+
50+
// AccessMode defines the mode to access the snapshot
51+
AccessMode string
52+
53+
// StorageClass is the storage class of the volume that the snapshot is taken for
54+
StorageClass string
55+
56+
// HostingPodLabels is the labels that are going to apply to the hosting pod
57+
HostingPodLabels map[string]string
58+
}
59+
60+
// CSISnapshotExposeWaitParam define the input param for WaitExposed of CSI snapshots
61+
type CSISnapshotExposeWaitParam struct {
62+
// NodeClient is the client that is used to find the hosting pod
63+
NodeClient client.Client
64+
NodeName string
65+
}
66+
67+
// NewCSISnapshotExposer create a new instance of CSI snapshot exposer
68+
func NewCSISnapshotExposer(kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface, log logrus.FieldLogger) SnapshotExposer {
69+
return &csiSnapshotExposer{
70+
kubeClient: kubeClient,
71+
csiSnapshotClient: csiSnapshotClient,
72+
log: log,
73+
}
74+
}
75+
76+
type csiSnapshotExposer struct {
77+
kubeClient kubernetes.Interface
78+
csiSnapshotClient snapshotter.SnapshotV1Interface
79+
log logrus.FieldLogger
80+
}
81+
82+
func (e *csiSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, snapshotName string, timeout time.Duration, param interface{}) error {
83+
csiExposeParam := param.(*CSISnapshotExposeParam)
84+
85+
curLog := e.log.WithFields(logrus.Fields{
86+
"owner": ownerObject.Name,
87+
})
88+
89+
curLog.Info("Exposing CSI snapshot")
90+
91+
volumeSnapshot, err := csi.WaitVolumeSnapshotReady(ctx, e.csiSnapshotClient, snapshotName, csiExposeParam.SourceNamespace, timeout)
92+
if err != nil {
93+
return errors.Wrapf(err, "error wait volume snapshot ready")
94+
}
95+
96+
curLog.Info("Volumesnapshot is ready")
97+
98+
vsc, err := csi.GetVolumeSnapshotContentForVolumeSnapshot(volumeSnapshot, e.csiSnapshotClient)
99+
if err != nil {
100+
return errors.Wrap(err, "error to get volume snapshot content")
101+
}
102+
103+
curLog.WithField("vsc name", vsc.Name).WithField("vs name", volumeSnapshot.Name).Infof("Got VSC from VS in namespace %s", volumeSnapshot.Namespace)
104+
105+
retained, err := csi.RetainVSC(ctx, e.csiSnapshotClient, vsc)
106+
if err != nil {
107+
return errors.Wrap(err, "error to retain volume snapshot content")
108+
}
109+
110+
curLog.WithField("vsc name", vsc.Name).WithField("retained", (retained != nil)).Info("Finished to retain VSC")
111+
112+
defer func() {
113+
if retained != nil {
114+
csi.DeleteVolumeSnapshotContentIfAny(ctx, e.csiSnapshotClient, retained.Name, curLog)
115+
}
116+
}()
117+
118+
err = csi.EnsureDeleteVS(ctx, e.csiSnapshotClient, volumeSnapshot.Name, volumeSnapshot.Namespace, timeout)
119+
if err != nil {
120+
return errors.Wrap(err, "error to delete volume snapshot")
121+
}
122+
123+
curLog.WithField("vs name", volumeSnapshot.Name).Infof("VS is deleted in namespace %s", volumeSnapshot.Namespace)
124+
125+
err = csi.EnsureDeleteVSC(ctx, e.csiSnapshotClient, vsc.Name, timeout)
126+
if err != nil {
127+
return errors.Wrap(err, "error to delete volume snapshot content")
128+
}
129+
130+
curLog.WithField("vsc name", vsc.Name).Infof("VSC is deleted")
131+
retained = nil
132+
133+
backupVS, err := e.createBackupVS(ctx, ownerObject, volumeSnapshot)
134+
if err != nil {
135+
return errors.Wrap(err, "error to create backup volume snapshot")
136+
}
137+
138+
curLog.WithField("vs name", backupVS.Name).Infof("Backup VS is created from %s/%s", volumeSnapshot.Namespace, volumeSnapshot.Name)
139+
140+
defer func() {
141+
if err != nil {
142+
csi.DeleteVolumeSnapshotIfAny(ctx, e.csiSnapshotClient, backupVS.Name, backupVS.Namespace, curLog)
143+
}
144+
}()
145+
146+
backupVSC, err := e.createBackupVSC(ctx, ownerObject, vsc, backupVS)
147+
if err != nil {
148+
return errors.Wrap(err, "error to create backup volume snapshot content")
149+
}
150+
151+
curLog.WithField("vsc name", backupVSC.Name).Infof("Backup VSC is created from %s", vsc.Name)
152+
153+
backupPVC, err := e.createBackupPVC(ctx, ownerObject, backupVS.Name, csiExposeParam.StorageClass, csiExposeParam.AccessMode, *volumeSnapshot.Status.RestoreSize)
154+
if err != nil {
155+
return errors.Wrap(err, "error to create backup pvc")
156+
}
157+
158+
curLog.WithField("pvc name", backupPVC.Name).Info("Backup PVC is created")
159+
160+
defer func() {
161+
if err != nil {
162+
kube.DeletePVCIfAny(ctx, e.kubeClient.CoreV1(), backupPVC.Name, backupPVC.Namespace, curLog)
163+
}
164+
}()
165+
166+
backupPod, err := e.createBackupPod(ctx, ownerObject, backupPVC, csiExposeParam.HostingPodLabels)
167+
if err != nil {
168+
return errors.Wrap(err, "error to create backup pod")
169+
}
170+
171+
curLog.WithField("pod name", backupPod.Name).Info("Backup pod is created")
172+
173+
defer func() {
174+
if err != nil {
175+
kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), backupPod.Name, backupPod.Namespace, curLog)
176+
}
177+
}()
178+
179+
return nil
180+
}
181+
182+
func (e *csiSnapshotExposer) GetExposed(ctx context.Context, ownerObject corev1.ObjectReference, timeout time.Duration, param interface{}) (*ExposeResult, error) {
183+
exposeWaitParam := param.(*CSISnapshotExposeWaitParam)
184+
185+
backupPodName := ownerObject.Name
186+
backupPVCName := ownerObject.Name
187+
188+
curLog := e.log.WithFields(logrus.Fields{
189+
"owner": ownerObject.Name,
190+
})
191+
192+
pod := &corev1.Pod{}
193+
err := exposeWaitParam.NodeClient.Get(ctx, types.NamespacedName{
194+
Namespace: ownerObject.Namespace,
195+
Name: backupPodName,
196+
}, pod)
197+
if err != nil {
198+
if apierrors.IsNotFound(err) {
199+
curLog.WithField("backup pod", backupPodName).Errorf("Backup pod is not running in the current node %s", exposeWaitParam.NodeName)
200+
return nil, nil
201+
} else {
202+
return nil, errors.Wrapf(err, "error to get backup pod %s", backupPodName)
203+
}
204+
}
205+
206+
curLog.WithField("pod", pod.Name).Infof("Backup pod is in running state in node %s", pod.Spec.NodeName)
207+
208+
_, err = kube.WaitPVCBound(ctx, e.kubeClient.CoreV1(), e.kubeClient.CoreV1(), backupPVCName, ownerObject.Namespace, timeout)
209+
if err != nil {
210+
return nil, errors.Wrapf(err, "error to wait backup PVC bound, %s", backupPVCName)
211+
}
212+
213+
curLog.WithField("backup pvc", backupPVCName).Info("Backup PVC is bound")
214+
215+
return &ExposeResult{ByPod: ExposeByPod{HostingPod: pod, PVC: backupPVCName}}, nil
216+
}
217+
218+
func (e *csiSnapshotExposer) CleanUp(ctx context.Context, ownerObject corev1.ObjectReference, vsName string, sourceNamespace string) {
219+
backupPodName := ownerObject.Name
220+
backupPVCName := ownerObject.Name
221+
backupVSName := ownerObject.Name
222+
223+
kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), backupPodName, ownerObject.Namespace, e.log)
224+
kube.DeletePVCIfAny(ctx, e.kubeClient.CoreV1(), backupPVCName, ownerObject.Namespace, e.log)
225+
csi.DeleteVolumeSnapshotIfAny(ctx, e.csiSnapshotClient, backupVSName, ownerObject.Namespace, e.log)
226+
csi.DeleteVolumeSnapshotIfAny(ctx, e.csiSnapshotClient, vsName, sourceNamespace, e.log)
227+
}
228+
229+
func getVolumeModeByAccessMode(accessMode string) (corev1.PersistentVolumeMode, error) {
230+
if accessMode == AccessModeFileSystem {
231+
return corev1.PersistentVolumeFilesystem, nil
232+
} else {
233+
return "", errors.Errorf("unsupported access mode %s", accessMode)
234+
}
235+
}
236+
237+
func (e *csiSnapshotExposer) createBackupVS(ctx context.Context, ownerObject corev1.ObjectReference, snapshotVS *snapshotv1api.VolumeSnapshot) (*snapshotv1api.VolumeSnapshot, error) {
238+
backupVSName := ownerObject.Name
239+
backupVSCName := ownerObject.Name
240+
241+
vs := &snapshotv1api.VolumeSnapshot{
242+
ObjectMeta: metav1.ObjectMeta{
243+
Name: backupVSName,
244+
Namespace: ownerObject.Namespace,
245+
// Don't add ownerReference to SnapshotBackup.
246+
// The backupPVC should be deleted before backupVS, otherwise, the deletion of backupVS will fail since
247+
// backupPVC has its dataSource referring to it
248+
},
249+
Spec: snapshotv1api.VolumeSnapshotSpec{
250+
Source: snapshotv1api.VolumeSnapshotSource{
251+
VolumeSnapshotContentName: &backupVSCName,
252+
},
253+
VolumeSnapshotClassName: snapshotVS.Spec.VolumeSnapshotClassName,
254+
},
255+
}
256+
257+
return e.csiSnapshotClient.VolumeSnapshots(vs.Namespace).Create(ctx, vs, metav1.CreateOptions{})
258+
}
259+
260+
func (e *csiSnapshotExposer) createBackupVSC(ctx context.Context, ownerObject corev1.ObjectReference, snapshotVSC *snapshotv1api.VolumeSnapshotContent, vs *snapshotv1api.VolumeSnapshot) (*snapshotv1api.VolumeSnapshotContent, error) {
261+
backupVSCName := ownerObject.Name
262+
263+
vsc := &snapshotv1api.VolumeSnapshotContent{
264+
ObjectMeta: metav1.ObjectMeta{
265+
Name: backupVSCName,
266+
},
267+
Spec: snapshotv1api.VolumeSnapshotContentSpec{
268+
VolumeSnapshotRef: corev1.ObjectReference{
269+
Name: vs.Name,
270+
Namespace: vs.Namespace,
271+
UID: vs.UID,
272+
ResourceVersion: vs.ResourceVersion,
273+
},
274+
Source: snapshotv1api.VolumeSnapshotContentSource{
275+
SnapshotHandle: snapshotVSC.Status.SnapshotHandle,
276+
},
277+
DeletionPolicy: snapshotVSC.Spec.DeletionPolicy,
278+
Driver: snapshotVSC.Spec.Driver,
279+
VolumeSnapshotClassName: snapshotVSC.Spec.VolumeSnapshotClassName,
280+
},
281+
}
282+
283+
return e.csiSnapshotClient.VolumeSnapshotContents().Create(ctx, vsc, metav1.CreateOptions{})
284+
}
285+
286+
func (e *csiSnapshotExposer) createBackupPVC(ctx context.Context, ownerObject corev1.ObjectReference, backupVS string, storageClass string, accessMode string, resource resource.Quantity) (*corev1.PersistentVolumeClaim, error) {
287+
backupVCName := ownerObject.Name
288+
289+
volumeMode, err := getVolumeModeByAccessMode(accessMode)
290+
if err != nil {
291+
return nil, err
292+
}
293+
294+
dataSource := &corev1.TypedLocalObjectReference{
295+
APIGroup: &snapshotv1api.SchemeGroupVersion.Group,
296+
Kind: "VolumeSnapshot",
297+
Name: backupVS,
298+
}
299+
300+
pvc := &corev1.PersistentVolumeClaim{
301+
ObjectMeta: metav1.ObjectMeta{
302+
Namespace: ownerObject.Namespace,
303+
Name: backupVCName,
304+
OwnerReferences: []metav1.OwnerReference{
305+
{
306+
APIVersion: ownerObject.APIVersion,
307+
Kind: ownerObject.Kind,
308+
Name: ownerObject.Name,
309+
UID: ownerObject.UID,
310+
Controller: boolptr.True(),
311+
},
312+
},
313+
},
314+
Spec: corev1.PersistentVolumeClaimSpec{
315+
AccessModes: []corev1.PersistentVolumeAccessMode{
316+
corev1.ReadWriteOnce,
317+
},
318+
StorageClassName: &storageClass,
319+
VolumeMode: &volumeMode,
320+
DataSource: dataSource,
321+
DataSourceRef: dataSource,
322+
323+
Resources: corev1.ResourceRequirements{
324+
Requests: corev1.ResourceList{
325+
corev1.ResourceStorage: resource,
326+
},
327+
},
328+
},
329+
}
330+
331+
created, err := e.kubeClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
332+
if err != nil {
333+
return nil, errors.Wrap(err, "error to create pvc")
334+
}
335+
336+
return created, err
337+
}
338+
339+
func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject corev1.ObjectReference, backupPVC *corev1.PersistentVolumeClaim, label map[string]string) (*corev1.Pod, error) {
340+
podName := ownerObject.Name
341+
342+
var gracePeriod int64 = 0
343+
344+
pod := &corev1.Pod{
345+
ObjectMeta: metav1.ObjectMeta{
346+
Name: podName,
347+
Namespace: ownerObject.Namespace,
348+
OwnerReferences: []metav1.OwnerReference{
349+
{
350+
APIVersion: ownerObject.APIVersion,
351+
Kind: ownerObject.Kind,
352+
Name: ownerObject.Name,
353+
UID: ownerObject.UID,
354+
Controller: boolptr.True(),
355+
},
356+
},
357+
Labels: label,
358+
},
359+
Spec: corev1.PodSpec{
360+
Containers: []corev1.Container{
361+
{
362+
Name: podName,
363+
Image: "alpine:latest",
364+
ImagePullPolicy: corev1.PullIfNotPresent,
365+
Command: []string{"sleep", "infinity"},
366+
VolumeMounts: []corev1.VolumeMount{{
367+
Name: backupPVC.Name,
368+
MountPath: "/" + backupPVC.Name,
369+
}},
370+
},
371+
},
372+
TerminationGracePeriodSeconds: &gracePeriod,
373+
Volumes: []corev1.Volume{{
374+
Name: backupPVC.Name,
375+
VolumeSource: corev1.VolumeSource{
376+
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
377+
ClaimName: backupPVC.Name,
378+
},
379+
},
380+
}},
381+
},
382+
}
383+
384+
return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{})
385+
}

0 commit comments

Comments
 (0)