Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8b2979b
provider: adding provide and reprovide queue
guillaumemichel Jul 24, 2025
4049cfd
Merge branch 'provider' into provider-queues
guillaumemichel Jul 24, 2025
fac928e
provider: network operations
guillaumemichel Jul 24, 2025
87c3d14
add some tests
guillaumemichel Jul 24, 2025
7b681e5
schedule prefix len computations
guillaumemichel Jul 24, 2025
a191032
provider schedule
guillaumemichel Jul 24, 2025
4081f02
addressed review
guillaumemichel Aug 5, 2025
83e6f59
Merge branch 'provider' into provider-queues
guillaumemichel Aug 5, 2025
db73ea4
use go-test/random
guillaumemichel Aug 5, 2025
e0ece0a
satisfy linter
guillaumemichel Aug 5, 2025
039e0a4
Merge branch 'provider-queues' into provider-network-ops
guillaumemichel Aug 5, 2025
47f82bf
Merge branch 'provider' into provider-network-ops
guillaumemichel Aug 5, 2025
c707b63
Merge branch 'provider-network-ops' into provider-prefixlen
guillaumemichel Aug 5, 2025
fd95218
log errors during initial prefix len measurement
guillaumemichel Aug 5, 2025
b117913
Merge branch 'provider' into provider-prefixlen
guillaumemichel Aug 5, 2025
c8c4bc3
Merge branch 'provider-prefixlen' into provider-schedule
guillaumemichel Aug 5, 2025
fbe5f8a
address review
guillaumemichel Aug 5, 2025
e1ddf61
satisfy linter
guillaumemichel Aug 5, 2025
b64abe5
simplify unscheduleSubsumedPrefixesNoClock
guillaumemichel Aug 8, 2025
486a39c
Merge branch 'provider' into provider-prefixlen
guillaumemichel Aug 8, 2025
59c9e9a
Merge branch 'provider-prefixlen' into provider-schedule
guillaumemichel Aug 8, 2025
1c13df6
Merge branch 'provider' into provider-schedule
guillaumemichel Aug 13, 2025
0e05081
moved maxPrefixSize const to top
guillaumemichel Aug 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 150 additions & 3 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package provider
import (
"context"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -61,6 +62,10 @@ type DHTProvider interface {

var _ DHTProvider = &SweepingProvider{}

// maxPrefixSize is the maximum size of a prefix used to define a keyspace
// region.
const maxPrefixSize = 24

var logger = logging.Logger("dht/SweepingProvider")

type KadClosestPeersRouter interface {
Expand All @@ -76,11 +81,17 @@ type SweepingProvider struct {
order bit256.Key
router KadClosestPeersRouter

clock clock.Clock

maxProvideConnsPerWorker int

schedule *trie.Trie[bitstr.Key, time.Duration]
clock clock.Clock
cycleStart time.Time
reprovideInterval time.Duration
maxReprovideDelay time.Duration

schedule *trie.Trie[bitstr.Key, time.Duration]
scheduleCursor bitstr.Key
scheduleTimer *clock.Timer
scheduleTimerStartedAt time.Time

avgPrefixLenLk sync.Mutex
avgPrefixLenReady chan struct{}
Expand All @@ -99,6 +110,7 @@ func (s *SweepingProvider) SatisfyLinter() {
s.closestPeersToKey("")
s.measureInitialPrefixLen()
s.getAvgPrefixLenNoLock()
s.schedulePrefixNoLock("", false)
}

// Close stops the provider and releases all resources.
Expand All @@ -115,6 +127,141 @@ func (s *SweepingProvider) closed() bool {
}
}

// scheduleNextReprovideNoLock makes sure the scheduler wakes up in
// `timeUntilReprovide` to reprovide the region identified by `prefix`.
func (s *SweepingProvider) scheduleNextReprovideNoLock(prefix bitstr.Key, timeUntilReprovide time.Duration) {
s.scheduleCursor = prefix
s.scheduleTimer.Reset(timeUntilReprovide)
s.scheduleTimerStartedAt = s.clock.Now()
}

// schedulePrefixNoLock adds the supplied prefix to the schedule, unless
// already present.
//
// If `justReprovided` is true, it will schedule the next reprovide at most
// s.reprovideInterval+s.maxReprovideDelay in the future, allowing the
// reprovide to be delayed of at most maxReprovideDelay.
//
// If the supplied prefix is the next prefix to be reprovided, update the
// schedule cursor and timer.
func (s *SweepingProvider) schedulePrefixNoLock(prefix bitstr.Key, justReprovided bool) {
nextReprovideTime := s.reprovideTimeForPrefix(prefix)
if justReprovided {
// Schedule next reprovide given that the prefix was just reprovided on
// schedule. In the case the next reprovide time should be delayed due to a
// growth in the number of network peers matching the prefix, don't delay
// more than s.maxReprovideDelay.
nextReprovideTime = min(nextReprovideTime, s.currentTimeOffset()+s.reprovideInterval+s.maxReprovideDelay)
}
// If schedule contains keys starting with prefix, remove them to avoid
// overlap.
if _, ok := keyspace.FindPrefixOfKey(s.schedule, prefix); ok {
// Already scheduled.
return
}
// Unschedule superstrings in schedule if any.
s.unscheduleSubsumedPrefixesNoLock(prefix)

s.schedule.Add(prefix, nextReprovideTime)

// Check if the prefix that was just added is the next one to be reprovided.
if s.schedule.IsNonEmptyLeaf() {
// The prefix we insterted is the only element in the schedule.
timeUntilPrefixReprovide := s.timeUntil(nextReprovideTime)
s.scheduleNextReprovideNoLock(prefix, timeUntilPrefixReprovide)
return
}
followingKey := keyspace.NextNonEmptyLeaf(s.schedule, prefix, s.order).Key
if followingKey == s.scheduleCursor {
// The key following prefix is the schedule cursor.
timeUntilPrefixReprovide := s.timeUntil(nextReprovideTime)
_, scheduledAlarm := trie.Find(s.schedule, s.scheduleCursor)
if timeUntilPrefixReprovide < s.timeUntil(scheduledAlarm) {
s.scheduleNextReprovideNoLock(prefix, timeUntilPrefixReprovide)
}
}
}

// unscheduleSubsumedPrefixes removes all superstrings of `prefix` that are
// scheduled in the future. Assumes that the schedule lock is held.
func (s *SweepingProvider) unscheduleSubsumedPrefixesNoLock(prefix bitstr.Key) {
// Pop prefixes scheduled in the future being covered by the explored peers.
keyspace.PruneSubtrie(s.schedule, prefix)

// If we removed s.scheduleCursor from schedule, select the next one
if keyspace.IsBitstrPrefix(prefix, s.scheduleCursor) {
next := keyspace.NextNonEmptyLeaf(s.schedule, s.scheduleCursor, s.order)
if next == nil {
s.scheduleNextReprovideNoLock(prefix, s.reprovideInterval)
} else {
timeUntilReprovide := s.timeUntil(next.Data)
s.scheduleNextReprovideNoLock(next.Key, timeUntilReprovide)
logger.Warnf("next scheduled prefix now is %s", s.scheduleCursor)
}
}
}

// currentTimeOffset returns the current time offset in the reprovide cycle.
func (s *SweepingProvider) currentTimeOffset() time.Duration {
return s.timeOffset(s.clock.Now())
}

// timeOffset returns the time offset in the reprovide cycle for the given
// time.
func (s *SweepingProvider) timeOffset(t time.Time) time.Duration {
return t.Sub(s.cycleStart) % s.reprovideInterval
}

// timeUntil returns the time left (duration) until the given time offset.
func (s *SweepingProvider) timeUntil(d time.Duration) time.Duration {
return s.timeBetween(s.currentTimeOffset(), d)
}

// timeBetween returns the duration between the two provided offsets, assuming
// it is no more than s.reprovideInterval.
func (s *SweepingProvider) timeBetween(from, to time.Duration) time.Duration {
return (to-from+s.reprovideInterval-1)%s.reprovideInterval + 1
}

// reprovideTimeForPrefix calculates the scheduled time offset for reproviding
// keys associated with a given prefix based on its bitstring prefix. The
// function maps the given binary prefix to a fraction of the overall reprovide
// interval (s.reprovideInterval), such that keys with prefixes closer to a
// configured target s.order (in XOR distance) are scheduled earlier and those
// further away later in the cycle.
//
// For any prefix of bit length n, the function generates 2^n distinct
// reprovide times that evenly partition the entire reprovide interval. The
// process first truncates s.order to n bits and then XORs it with the provided
// prefix. The resulting binary string is converted to an integer,
// corresponding to the index of the 2^n possible reprovide times to use for
// the prefix.
//
// This method ensures a deterministic and evenly distributed reprovide
// schedule, where the temporal position within the cycle is based on the
// binary representation of the key's prefix.
func (s *SweepingProvider) reprovideTimeForPrefix(prefix bitstr.Key) time.Duration {
if len(prefix) == 0 {
// Empty prefix: all reprovides occur at the beginning of the cycle.
return 0
}
if len(prefix) > maxPrefixSize {
// Truncate the prefix to the maximum allowed size to avoid overly fine
// slicing of time.
prefix = prefix[:maxPrefixSize]
}
// Number of possible bitstrings of the same length as prefix.
maxInt := int64(1 << len(prefix))
// XOR the prefix with the order key to reorder the schedule: keys "close" to
// s.order are scheduled first in the cycle, and those "far" from it are
// scheduled later.
order := bitstr.Key(key.BitString(s.order)[:len(prefix)])
k := prefix.Xor(order)
val, _ := strconv.ParseInt(string(k), 2, 64)
// Calculate the time offset as a fraction of the overall reprovide interval.
return time.Duration(int64(s.reprovideInterval) * val / maxInt)
}

const initialGetClosestPeers = 4

// measureInitialPrefixLen makes a few GetClosestPeers calls to get an estimate
Expand Down
30 changes: 30 additions & 0 deletions provider/provider_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package provider

import (
"bytes"
"context"
"crypto/sha256"
"errors"
"strconv"
"testing"
"time"

"github.com/ipfs/go-test/random"
kb "github.com/libp2p/go-libp2p-kbucket"
Expand Down Expand Up @@ -116,3 +118,31 @@ func TestKeysAllocationsToPeers(t *testing.T) {
}
}
}

func TestReprovideTimeForPrefixWithOrderZero(t *testing.T) {
s := SweepingProvider{
reprovideInterval: 16 * time.Second,
order: bit256.ZeroKey(),
}

require.Equal(t, 0*time.Second, s.reprovideTimeForPrefix("0"))
require.Equal(t, 8*time.Second, s.reprovideTimeForPrefix("1"))
require.Equal(t, 0*time.Second, s.reprovideTimeForPrefix("000"))
require.Equal(t, 8*time.Second, s.reprovideTimeForPrefix("1000"))
require.Equal(t, 10*time.Second, s.reprovideTimeForPrefix("1010"))
require.Equal(t, 15*time.Second, s.reprovideTimeForPrefix("1111"))
}

func TestReprovideTimeForPrefixWithCustomOrder(t *testing.T) {
s := SweepingProvider{
reprovideInterval: 16 * time.Second,
order: bit256.NewKey(bytes.Repeat([]byte{0xFF}, 32)), // 111...1
}

require.Equal(t, 0*time.Second, s.reprovideTimeForPrefix("1"))
require.Equal(t, 8*time.Second, s.reprovideTimeForPrefix("0"))
require.Equal(t, 0*time.Second, s.reprovideTimeForPrefix("111"))
require.Equal(t, 8*time.Second, s.reprovideTimeForPrefix("0111"))
require.Equal(t, 10*time.Second, s.reprovideTimeForPrefix("0101"))
require.Equal(t, 15*time.Second, s.reprovideTimeForPrefix("0000"))
}
Loading