Skip to content

Commit 6c0ed1e

Browse files
Merge pull request #8366 from sseago/synchronise-backedupitems
Make BackedUpItems thread safe
2 parents 2e5df85 + 015b1e6 commit 6c0ed1e

File tree

9 files changed

+154
-44
lines changed

9 files changed

+154
-44
lines changed

changelogs/unreleased/8366-sseago

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Make BackedUpItems thread safe

pkg/backup/backed_up_items_map.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
Copyright the Velero contributors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package backup
18+
19+
import (
20+
"fmt"
21+
"sort"
22+
"sync"
23+
)
24+
25+
// backedUpItemsMap keeps track of the items already backed up for the current Velero Backup
26+
type backedUpItemsMap struct {
27+
*sync.RWMutex
28+
backedUpItems map[itemKey]struct{}
29+
}
30+
31+
func NewBackedUpItemsMap() *backedUpItemsMap {
32+
return &backedUpItemsMap{
33+
RWMutex: &sync.RWMutex{},
34+
backedUpItems: make(map[itemKey]struct{}),
35+
}
36+
}
37+
38+
func (m *backedUpItemsMap) CopyItemMap() map[itemKey]struct{} {
39+
m.RLock()
40+
defer m.RUnlock()
41+
returnMap := make(map[itemKey]struct{}, len(m.backedUpItems))
42+
for key, val := range m.backedUpItems {
43+
returnMap[key] = val
44+
}
45+
return returnMap
46+
}
47+
48+
// ResourceMap returns a map of the backed up items.
49+
// For each map entry, the key is the resource type,
50+
// and the value is a list of namespaced names for the resource.
51+
func (m *backedUpItemsMap) ResourceMap() map[string][]string {
52+
m.RLock()
53+
defer m.RUnlock()
54+
55+
resources := map[string][]string{}
56+
for i := range m.backedUpItems {
57+
entry := i.name
58+
if i.namespace != "" {
59+
entry = fmt.Sprintf("%s/%s", i.namespace, i.name)
60+
}
61+
resources[i.resource] = append(resources[i.resource], entry)
62+
}
63+
64+
// sort namespace/name entries for each GVK
65+
for _, v := range resources {
66+
sort.Strings(v)
67+
}
68+
69+
return resources
70+
}
71+
72+
func (m *backedUpItemsMap) Len() int {
73+
m.RLock()
74+
defer m.RUnlock()
75+
return len(m.backedUpItems)
76+
}
77+
78+
func (m *backedUpItemsMap) Has(key itemKey) bool {
79+
m.RLock()
80+
defer m.RUnlock()
81+
82+
_, exists := m.backedUpItems[key]
83+
return exists
84+
}
85+
86+
func (m *backedUpItemsMap) AddItem(key itemKey) {
87+
m.Lock()
88+
defer m.Unlock()
89+
m.backedUpItems[key] = struct{}{}
90+
}

pkg/backup/backup.go

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -297,8 +297,6 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
297297
return err
298298
}
299299

300-
backupRequest.BackedUpItems = map[itemKey]struct{}{}
301-
302300
podVolumeTimeout := kb.podVolumeTimeout
303301
if val := backupRequest.Annotations[velerov1api.PodVolumeOperationTimeoutAnnotation]; val != "" {
304302
parsed, err := time.ParseDuration(val)
@@ -499,20 +497,21 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
499497

500498
// updated total is computed as "how many items we've backed up so far, plus
501499
// how many items we know of that are remaining"
502-
totalItems := len(backupRequest.BackedUpItems) + (len(items) - (i + 1))
500+
backedUpItems := backupRequest.BackedUpItems.Len()
501+
totalItems := backedUpItems + (len(items) - (i + 1))
503502

504503
// send a progress update
505504
update <- progressUpdate{
506505
totalItems: totalItems,
507-
itemsBackedUp: len(backupRequest.BackedUpItems),
506+
itemsBackedUp: backedUpItems,
508507
}
509508

510509
log.WithFields(map[string]interface{}{
511510
"progress": "",
512511
"resource": items[i].groupResource.String(),
513512
"namespace": items[i].namespace,
514513
"name": items[i].name,
515-
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", len(backupRequest.BackedUpItems), totalItems)
514+
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", backedUpItems, totalItems)
516515
}
517516

518517
// no more progress updates will be sent on the 'update' channel
@@ -538,8 +537,9 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
538537
if updated.Status.Progress == nil {
539538
updated.Status.Progress = &velerov1api.BackupProgress{}
540539
}
541-
updated.Status.Progress.TotalItems = len(backupRequest.BackedUpItems)
542-
updated.Status.Progress.ItemsBackedUp = len(backupRequest.BackedUpItems)
540+
backedUpItems := backupRequest.BackedUpItems.Len()
541+
updated.Status.Progress.TotalItems = backedUpItems
542+
updated.Status.Progress.ItemsBackedUp = backedUpItems
543543

544544
// update the hooks execution status
545545
if updated.Status.HookStatus == nil {
@@ -558,8 +558,8 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
558558
log.Infof("Summary for skipped PVs: %s", skippedPVSummary)
559559
}
560560

561-
backupRequest.Status.Progress = &velerov1api.BackupProgress{TotalItems: len(backupRequest.BackedUpItems), ItemsBackedUp: len(backupRequest.BackedUpItems)}
562-
log.WithField("progress", "").Infof("Backed up a total of %d items", len(backupRequest.BackedUpItems))
561+
backupRequest.Status.Progress = &velerov1api.BackupProgress{TotalItems: backedUpItems, ItemsBackedUp: backedUpItems}
562+
log.WithField("progress", "").Infof("Backed up a total of %d items", backedUpItems)
563563

564564
return nil
565565
}
@@ -667,7 +667,7 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche
667667
continue
668668
}
669669
// Don't run hooks if pod has already been backed up
670-
if _, exists := itemBlock.itemBackupper.backupRequest.BackedUpItems[key]; !exists {
670+
if !itemBlock.itemBackupper.backupRequest.BackedUpItems.Has(key) {
671671
preHookPods = append(preHookPods, item)
672672
}
673673
}
@@ -681,7 +681,7 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche
681681
itemBlock.Log.WithError(errors.WithStack(err)).Error("Error accessing pod metadata")
682682
continue
683683
}
684-
itemBlock.itemBackupper.backupRequest.BackedUpItems[key] = struct{}{}
684+
itemBlock.itemBackupper.backupRequest.BackedUpItems.AddItem(key)
685685
}
686686

687687
itemBlock.Log.Debug("Backing up items in BackupItemBlock")
@@ -861,8 +861,6 @@ func (kb *kubernetesBackupper) FinalizeBackup(
861861
return err
862862
}
863863

864-
backupRequest.BackedUpItems = map[itemKey]struct{}{}
865-
866864
// set up a temp dir for the itemCollector to use to temporarily
867865
// store items as they're scraped from the API.
868866
tempDir, err := os.MkdirTemp("", "")
@@ -947,14 +945,15 @@ func (kb *kubernetesBackupper) FinalizeBackup(
947945

948946
// updated total is computed as "how many items we've backed up so far, plus
949947
// how many items we know of that are remaining"
950-
totalItems := len(backupRequest.BackedUpItems) + (len(items) - (i + 1))
948+
backedUpItems := backupRequest.BackedUpItems.Len()
949+
totalItems := backedUpItems + (len(items) - (i + 1))
951950

952951
log.WithFields(map[string]interface{}{
953952
"progress": "",
954953
"resource": item.groupResource.String(),
955954
"namespace": item.namespace,
956955
"name": item.name,
957-
}).Infof("Updated %d items out of an estimated total of %d (estimate will change throughout the backup finalizer)", len(backupRequest.BackedUpItems), totalItems)
956+
}).Infof("Updated %d items out of an estimated total of %d (estimate will change throughout the backup finalizer)", backedUpItems, totalItems)
958957
}
959958

960959
volumeInfos, err := backupStore.GetBackupVolumeInfos(backupRequest.Backup.Name)
@@ -979,7 +978,7 @@ func (kb *kubernetesBackupper) FinalizeBackup(
979978
return err
980979
}
981980

982-
log.WithField("progress", "").Infof("Updated a total of %d items", len(backupRequest.BackedUpItems))
981+
log.WithField("progress", "").Infof("Updated a total of %d items", backupRequest.BackedUpItems.Len())
983982

984983
return nil
985984
}

0 commit comments

Comments
 (0)