Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
8b2979b
provider: adding provide and reprovide queue
guillaumemichel Jul 24, 2025
4049cfd
Merge branch 'provider' into provider-queues
guillaumemichel Jul 24, 2025
fac928e
provider: network operations
guillaumemichel Jul 24, 2025
87c3d14
add some tests
guillaumemichel Jul 24, 2025
7b681e5
schedule prefix len computations
guillaumemichel Jul 24, 2025
a191032
provider schedule
guillaumemichel Jul 24, 2025
41fcb41
provider: handleProvide
guillaumemichel Jul 24, 2025
4081f02
addressed review
guillaumemichel Aug 5, 2025
83e6f59
Merge branch 'provider' into provider-queues
guillaumemichel Aug 5, 2025
db73ea4
use go-test/random
guillaumemichel Aug 5, 2025
e0ece0a
satisfy linter
guillaumemichel Aug 5, 2025
039e0a4
Merge branch 'provider-queues' into provider-network-ops
guillaumemichel Aug 5, 2025
47f82bf
Merge branch 'provider' into provider-network-ops
guillaumemichel Aug 5, 2025
c707b63
Merge branch 'provider-network-ops' into provider-prefixlen
guillaumemichel Aug 5, 2025
fd95218
log errors during initial prefix len measurement
guillaumemichel Aug 5, 2025
b117913
Merge branch 'provider' into provider-prefixlen
guillaumemichel Aug 5, 2025
c8c4bc3
Merge branch 'provider-prefixlen' into provider-schedule
guillaumemichel Aug 5, 2025
fbe5f8a
address review
guillaumemichel Aug 5, 2025
e1ddf61
satisfy linter
guillaumemichel Aug 5, 2025
634601e
Merge branch 'provider-schedule' into provider-handleProvide
guillaumemichel Aug 5, 2025
834802b
address review
guillaumemichel Aug 5, 2025
5fa93f1
provider: explore swarm
guillaumemichel Aug 6, 2025
2889495
provider: batch provide
guillaumemichel Aug 6, 2025
47f65cb
provider: batch reprovide
guillaumemichel Aug 6, 2025
b9f9004
fix panic when adding key to trie if superstring already exists
guillaumemichel Aug 7, 2025
17335b6
address review
guillaumemichel Aug 8, 2025
31a1797
decrease minimal region size from replicationFactor+1 to replicationF…
guillaumemichel Aug 8, 2025
b64abe5
simplify unscheduleSubsumedPrefixesNoClock
guillaumemichel Aug 8, 2025
486a39c
Merge branch 'provider' into provider-prefixlen
guillaumemichel Aug 8, 2025
59c9e9a
Merge branch 'provider-prefixlen' into provider-schedule
guillaumemichel Aug 8, 2025
48a0033
Merge branch 'provider-schedule' into provider-handleProvide
guillaumemichel Aug 8, 2025
425565a
address review
guillaumemichel Aug 8, 2025
6900ec9
Merge branch 'provider-handleProvide' into provider-exploreSwarm
guillaumemichel Aug 8, 2025
1299170
Merge branch 'provider-exploreSwarm' into provider-batchProvide
guillaumemichel Aug 8, 2025
4111c0c
Merge branch 'provider-batchProvide' into provider-batchReprovide
guillaumemichel Aug 8, 2025
d318fae
fix test to match region size (now: replicationFactor, before: replic…
guillaumemichel Aug 8, 2025
79e38d9
Merge branch 'provider-exploreSwarm' into provider-batchProvide
guillaumemichel Aug 8, 2025
bbfba3d
Merge branch 'provider-batchProvide' into provider-batchReprovide
guillaumemichel Aug 8, 2025
c7f77e1
refactor and test groupAndScheduleKeysByPrefix
guillaumemichel Aug 8, 2025
1c13df6
Merge branch 'provider' into provider-schedule
guillaumemichel Aug 13, 2025
0e05081
moved maxPrefixSize const to top
guillaumemichel Aug 13, 2025
d0f030a
Merge branch 'provider-schedule' into provider-handleProvide
guillaumemichel Aug 13, 2025
b1f376e
Merge branch 'provider-handleProvide' into provider-exploreSwarm
guillaumemichel Aug 13, 2025
ab7b9e6
Merge branch 'provider-exploreSwarm' into provider-batchProvide
guillaumemichel Aug 13, 2025
933aace
Merge branch 'provider-schedule' into provider-handleProvide
guillaumemichel Aug 13, 2025
d09b224
Merge branch 'provider-handleProvide' into provider-exploreSwarm
guillaumemichel Aug 13, 2025
a69033a
Merge branch 'provider-exploreSwarm' into provider-batchProvide
guillaumemichel Aug 13, 2025
5919066
Merge branch 'provider' into provider-handleProvide
guillaumemichel Aug 13, 2025
553a92e
Merge branch 'provider-handleProvide' into provider-exploreSwarm
guillaumemichel Aug 13, 2025
f4d5789
Merge branch 'provider-exploreSwarm' into provider-batchProvide
guillaumemichel Aug 13, 2025
6e1c0c8
Merge branch 'provider' into provider-exploreSwarm
guillaumemichel Aug 13, 2025
1f91ac2
Merge branch 'provider-exploreSwarm' into provider-batchProvide
guillaumemichel Aug 13, 2025
71cb716
address review
guillaumemichel Aug 13, 2025
1b086a7
Merge branch 'provider' into provider-batchProvide
guillaumemichel Aug 13, 2025
056104c
Merge branch 'provider-batchProvide' into provider-batchReprovide
guillaumemichel Aug 13, 2025
8a3d816
Merge branch 'provider' into provider-batchReprovide
guillaumemichel Aug 13, 2025
cd338c8
address review
guillaumemichel Aug 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 184 additions & 5 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type SweepingProvider struct {

provideQueue *queue.ProvideQueue
provideRunning sync.Mutex
reprovideQueue *queue.ReprovideQueue

workerPool *pool.Pool[workerType]
maxProvideConnsPerWorker int
Expand All @@ -126,6 +127,9 @@ type SweepingProvider struct {
scheduleTimer *clock.Timer
scheduleTimerStartedAt time.Time

ongoingReprovides *trie.Trie[bitstr.Key, struct{}]
ongoingReprovidesLk sync.Mutex

avgPrefixLenLk sync.Mutex
avgPrefixLenReady chan struct{}
cachedAvgPrefixLen int
Expand All @@ -141,8 +145,8 @@ type SweepingProvider struct {

// FIXME: remove me
func (s *SweepingProvider) SatisfyLinter() {
s.vanillaProvide([]byte{})
s.measureInitialPrefixLen()
s.batchReprovide("", true)
}

// Close stops the provider and releases all resources.
Expand All @@ -167,6 +171,12 @@ func (s *SweepingProvider) scheduleNextReprovideNoLock(prefix bitstr.Key, timeUn
s.scheduleTimerStartedAt = s.clock.Now()
}

func (s *SweepingProvider) reschedulePrefix(prefix bitstr.Key) {
s.scheduleLk.Lock()
s.schedulePrefixNoLock(prefix, true)
s.scheduleLk.Unlock()
}

// schedulePrefixNoLock adds the supplied prefix to the schedule, unless
// already present.
//
Expand Down Expand Up @@ -776,7 +786,79 @@ func (s *SweepingProvider) batchProvide(prefix bitstr.Key, keys []mh.Multihash)
keys = append(keys, extraKeys...)
regions = keyspace.AssignKeysToRegions(regions, keys)

if !s.provideRegions(regions, addrInfo) {
if !s.provideRegions(regions, addrInfo, false, false) {
logger.Errorf("failed to reprovide any region for prefix %s", prefix)
}
}

func (s *SweepingProvider) batchReprovide(prefix bitstr.Key, periodicReprovide bool) {
addrInfo, ok := s.selfAddrInfo()
if !ok {
// Don't provide if the node doesn't have a valid address to include in the
// provider record.
return
}

// Load keys matching prefix from the keystore.
keys, err := s.keyStore.Get(context.Background(), prefix)
if err != nil {
s.failedReprovide(prefix, fmt.Errorf("couldn't reprovide, error when loading keys: %s", err))
if periodicReprovide {
s.reschedulePrefix(prefix)
}
return
}
if len(keys) == 0 {
logger.Infof("No keys to reprovide for prefix %s", prefix)
return
}
if len(keys) <= individualProvideThreshold {
// Don't fully explore the region, execute simple DHT provides for these
// keys. It isn't worth it to fully explore a region for just a few keys.
s.individualProvide(prefix, keys, true, periodicReprovide)
return
}

regions, coveredPrefix, err := s.exploreSwarm(prefix)
if err != nil {
s.failedReprovide(prefix, fmt.Errorf("reprovide '%s': %w", prefix, err))
if periodicReprovide {
s.reschedulePrefix(prefix)
}
return
}
logger.Debugf("reprovide: requested prefix '%s' (len %d), prefix covered '%s' (len %d)", prefix, len(prefix), coveredPrefix, len(coveredPrefix))

regions = s.claimRegionReprovide(regions)

// Remove all keys matching coveredPrefix from provide queue. No need to
// provide them anymore since they are about to be reprovided.
s.provideQueue.DequeueMatching(coveredPrefix)
// Remove covered prefix from the reprovide queue, so since we are about the
// reprovide the region.
s.reprovideQueue.Remove(coveredPrefix)

// When reproviding a region, remove all scheduled regions starting with
// the currently covered prefix.
s.scheduleLk.Lock()
s.unscheduleSubsumedPrefixesNoLock(coveredPrefix)
s.scheduleLk.Unlock()

if len(coveredPrefix) < len(prefix) {
// Covered prefix is shorter than the requested one, load all the keys
// matching the covered prefix from the keystore.
keys, err = s.keyStore.Get(context.Background(), coveredPrefix)
if err != nil {
err = fmt.Errorf("couldn't reprovide, error when loading keys: %s", err)
s.failedReprovide(prefix, err)
if periodicReprovide {
s.reschedulePrefix(prefix)
}
}
}
regions = keyspace.AssignKeysToRegions(regions, keys)

if !s.provideRegions(regions, addrInfo, true, periodicReprovide) {
logger.Errorf("failed to reprovide any region for prefix %s", prefix)
}
}
Expand All @@ -789,6 +871,14 @@ func (s *SweepingProvider) failedProvide(prefix bitstr.Key, keys []mh.Multihash,
s.connectivity.TriggerCheck()
}

func (s *SweepingProvider) failedReprovide(prefix bitstr.Key, err error) {
logger.Error(err)
// Put prefix in the reprovide queue.
s.reprovideQueue.Enqueue(prefix)

s.connectivity.TriggerCheck()
}

// selfAddrInfo returns the current peer.AddrInfo to be used in the provider
// records sent to remote peers.
//
Expand All @@ -808,17 +898,69 @@ func (s *SweepingProvider) selfAddrInfo() (peer.AddrInfo, bool) {
// provides for the supplied keys, handles failures and schedules next
// reprovide is necessary.
func (s *SweepingProvider) individualProvide(prefix bitstr.Key, keys []mh.Multihash, reprovide bool, periodicReprovide bool) {
// TODO: implement me
if len(keys) == 0 {
return
}

var provideErr error
if len(keys) == 1 {
coveredPrefix, err := s.vanillaProvide(keys[0])
if err == nil {
s.provideCounter.Add(context.Background(), 1)
} else if !reprovide {
// Put the key back in the provide queue.
s.failedProvide(prefix, keys, fmt.Errorf("individual provide failed for prefix '%s', %w", prefix, err))
}
provideErr = err
if periodicReprovide {
// Schedule next reprovide for the prefix that was actually covered by
// the GCP, otherwise we may schedule a reprovide for a prefix too short
// or too long.
s.reschedulePrefix(coveredPrefix)
}
} else {
wg := sync.WaitGroup{}
success := atomic.Bool{}
for _, key := range keys {
wg.Add(1)
go func() {
defer wg.Done()
_, err := s.vanillaProvide(key)
if err == nil {
s.provideCounter.Add(context.Background(), 1)
success.Store(true)
} else if !reprovide {
// Individual provide failed, put key back in provide queue.
s.failedProvide(prefix, []mh.Multihash{key}, err)
}
}()
}
wg.Wait()

if !success.Load() {
// Only errors if all provides failed.
provideErr = fmt.Errorf("all individual provides failed for prefix %s", prefix)
}
if periodicReprovide {
s.reschedulePrefix(prefix)
}
}
if reprovide && provideErr != nil {
s.failedReprovide(prefix, provideErr)
}
}

// provideRegions contains common logic to batchProvide() and batchReprovide().
// It iterate over supplied regions, and allocates the regions provider records
// to the appropriate DHT servers.
func (s *SweepingProvider) provideRegions(regions []keyspace.Region, addrInfo peer.AddrInfo) bool {
func (s *SweepingProvider) provideRegions(regions []keyspace.Region, addrInfo peer.AddrInfo, reprovide, periodicReprovide bool) bool {
errCount := 0
for _, r := range regions {
nKeys := r.Keys.Size()
if nKeys == 0 {
if reprovide {
s.releaseRegionReprovide(r.Prefix)
}
continue
}
// Add keys to local provider store
Expand All @@ -827,10 +969,20 @@ func (s *SweepingProvider) provideRegions(regions []keyspace.Region, addrInfo pe
}
keysAllocations := keyspace.AllocateToKClosest(r.Keys, r.Peers, s.replicationFactor)
err := s.sendProviderRecords(keysAllocations, addrInfo)
if reprovide {
s.releaseRegionReprovide(r.Prefix)
if periodicReprovide {
s.reschedulePrefix(r.Prefix)
}
}
if err != nil {
errCount++
err = fmt.Errorf("cannot send provider records for region %s: %s", r.Prefix, err)
s.failedProvide(r.Prefix, keyspace.AllValues(r.Keys, s.order), err)
if reprovide {
s.failedReprovide(r.Prefix, err)
} else { // provide operation
s.failedProvide(r.Prefix, keyspace.AllValues(r.Keys, s.order), err)
}
continue
}
s.provideCounter.Add(context.Background(), int64(nKeys))
Expand All @@ -840,6 +992,33 @@ func (s *SweepingProvider) provideRegions(regions []keyspace.Region, addrInfo pe
return errCount < len(regions)
}

// claimRegionReprovide checks if the region is already being reprovided by
// another thread. If not it marks the region as being currently reprovided.
func (s *SweepingProvider) claimRegionReprovide(regions []keyspace.Region) []keyspace.Region {
out := regions[:0]
s.ongoingReprovidesLk.Lock()
defer s.ongoingReprovidesLk.Unlock()
for _, r := range regions {
if r.Peers.IsEmptyLeaf() {
continue
}
if _, ok := keyspace.FindPrefixOfKey(s.ongoingReprovides, r.Prefix); !ok {
// Prune superstrings of r.Prefix if any
keyspace.PruneSubtrie(s.ongoingReprovides, r.Prefix)
out = append(out, r)
s.ongoingReprovides.Add(r.Prefix, struct{}{})
}
}
return out
}

// releaseRegionReprovide marks the region as no longer being reprovided.
func (s *SweepingProvider) releaseRegionReprovide(prefix bitstr.Key) {
s.ongoingReprovidesLk.Lock()
defer s.ongoingReprovidesLk.Unlock()
s.ongoingReprovides.Remove(prefix)
}

// ProvideOnce only sends provider records for the given keys out to the DHT
// swarm. It does NOT take the responsibility to reprovide these keys.
func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) {
Expand Down
Loading
Loading