Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
ab74101
test GCP result order (#1097)
guillaumemichel Jul 12, 2025
3a0f8bb
provider initial commit
guillaumemichel Jul 9, 2025
6bd8b58
provider: keystore (#1096)
guillaumemichel Jul 23, 2025
010def7
provider: SweepingProvider interface (#1098)
guillaumemichel Jul 23, 2025
bc1f342
provider: Connectivity Checker (#1099)
guillaumemichel Jul 24, 2025
ae1c18f
provider: key helpers (#1101)
guillaumemichel Jul 24, 2025
49b3acb
provider: add ShortestCoveredPrefix helper (#1102)
guillaumemichel Jul 24, 2025
5e720eb
provider: trie items listing helpers (#1103)
guillaumemichel Jul 24, 2025
6e77db2
provider: helpers trie find prefix (#1104)
guillaumemichel Jul 24, 2025
8057765
provider: find subtrie helper (#1105)
guillaumemichel Jul 24, 2025
9c3161c
provider: NextNonEmptyLeaf trie helper (#1106)
guillaumemichel Jul 24, 2025
979ccc2
provider: PruneSubtrie helper (#1107)
guillaumemichel Jul 24, 2025
d8f1c63
provider: trie region helpers (#1109)
guillaumemichel Jul 24, 2025
c72809c
provider: helpers pacakge rename (#1111)
guillaumemichel Jul 24, 2025
e2adfb2
provider: keyspace helpers
guillaumemichel Jul 24, 2025
724b286
add missing ShortestCoveredPrefix
guillaumemichel Jul 24, 2025
d05b1f9
provider: trie allocation helper correction #1108
guillaumemichel Aug 14, 2025
a41eaa4
provider: adding provide and reprovide queue (#1114)
guillaumemichel Aug 5, 2025
911055b
provider: network operations (#1115)
guillaumemichel Aug 5, 2025
99929e4
provider: ProvideStatus interface (#1110)
guillaumemichel Aug 8, 2025
3e8ebb4
provider: schedule prefix length (#1116)
guillaumemichel Aug 13, 2025
648fa03
provider: schedule (#1117)
guillaumemichel Aug 13, 2025
ea7a839
provider: handleProvide (#1118)
guillaumemichel Aug 13, 2025
582e3c4
provider: swarm exploration (#1120)
guillaumemichel Aug 13, 2025
e3761f9
provider: batch provide (#1121)
guillaumemichel Aug 13, 2025
fb1534c
provider: batch reprovide (#1122)
guillaumemichel Aug 13, 2025
1704edb
provider: catchup pending work (#1123)
guillaumemichel Aug 13, 2025
0cb418a
provider: options (#1124)
guillaumemichel Aug 13, 2025
a67da4b
provide: handle reprovide (#1125)
guillaumemichel Aug 13, 2025
f0631c4
provider: daemon (#1126)
guillaumemichel Aug 13, 2025
d5c2df1
provider: integration tests (#1127)
guillaumemichel Aug 13, 2025
3bf45a4
go.mod conflicts
guillaumemichel Aug 19, 2025
ca7bfb8
provider: refresh schedule (#1131)
guillaumemichel Aug 19, 2025
6969f64
dual: provider (#1132)
guillaumemichel Aug 19, 2025
160a41e
provider: minor fixes (#1133)
guillaumemichel Aug 22, 2025
2f250a5
provider: connectivity state machine (#1135)
guillaumemichel Sep 4, 2025
ade2c00
provider: use synctest in time based tests #1136
guillaumemichel Sep 4, 2025
cd54443
keystore: revamp (#1142)
guillaumemichel Sep 8, 2025
ba690c7
provider: ResettableKeyStore (#1146)
guillaumemichel Sep 16, 2025
93ac606
keystore: remove mutex (#1147)
guillaumemichel Sep 16, 2025
e2f65e5
buffered provider (#1149)
guillaumemichel Sep 16, 2025
0181891
provider: minor fixes (#1150)
guillaumemichel Sep 16, 2025
efcdc80
rename KeyStore -> Keystore (#1151)
guillaumemichel Sep 16, 2025
a5b7683
provider: more minor fixes (#1152)
guillaumemichel Sep 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
provider: schedule (#1117)
* provider: adding provide and reprovide queue

* provider: network operations

* add some tests

* schedule prefix len computations

* provider schedule

* addressed review

* use go-test/random

* satisfy linter

* log errors during initial prefix len measurement

* address review

* satisfy linter

* simplify unscheduleSubsumedPrefixesNoClock

* moved maxPrefixSize const to top
  • Loading branch information
guillaumemichel committed Sep 17, 2025
commit 648fa038a6b3033ca4c098c48d5cf96c425f1646
153 changes: 150 additions & 3 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package provider
import (
"context"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -61,6 +62,10 @@ type DHTProvider interface {

var _ DHTProvider = &SweepingProvider{}

// maxPrefixSize is the maximum size of a prefix used to define a keyspace
// region.
const maxPrefixSize = 24

var logger = logging.Logger("dht/SweepingProvider")

type KadClosestPeersRouter interface {
Expand All @@ -76,11 +81,17 @@ type SweepingProvider struct {
order bit256.Key
router KadClosestPeersRouter

clock clock.Clock

maxProvideConnsPerWorker int

schedule *trie.Trie[bitstr.Key, time.Duration]
clock clock.Clock
cycleStart time.Time
reprovideInterval time.Duration
maxReprovideDelay time.Duration

schedule *trie.Trie[bitstr.Key, time.Duration]
scheduleCursor bitstr.Key
scheduleTimer *clock.Timer
scheduleTimerStartedAt time.Time

avgPrefixLenLk sync.Mutex
avgPrefixLenReady chan struct{}
Expand All @@ -99,6 +110,7 @@ func (s *SweepingProvider) SatisfyLinter() {
s.closestPeersToKey("")
s.measureInitialPrefixLen()
s.getAvgPrefixLenNoLock()
s.schedulePrefixNoLock("", false)
}

// Close stops the provider and releases all resources.
Expand All @@ -115,6 +127,141 @@ func (s *SweepingProvider) closed() bool {
}
}

// scheduleNextReprovideNoLock makes sure the scheduler wakes up in
// `timeUntilReprovide` to reprovide the region identified by `prefix`.
func (s *SweepingProvider) scheduleNextReprovideNoLock(prefix bitstr.Key, timeUntilReprovide time.Duration) {
s.scheduleCursor = prefix
s.scheduleTimer.Reset(timeUntilReprovide)
s.scheduleTimerStartedAt = s.clock.Now()
}

// schedulePrefixNoLock adds the supplied prefix to the schedule, unless
// already present.
//
// If `justReprovided` is true, it will schedule the next reprovide at most
// s.reprovideInterval+s.maxReprovideDelay in the future, allowing the
// reprovide to be delayed of at most maxReprovideDelay.
//
// If the supplied prefix is the next prefix to be reprovided, update the
// schedule cursor and timer.
func (s *SweepingProvider) schedulePrefixNoLock(prefix bitstr.Key, justReprovided bool) {
nextReprovideTime := s.reprovideTimeForPrefix(prefix)
if justReprovided {
// Schedule next reprovide given that the prefix was just reprovided on
// schedule. In the case the next reprovide time should be delayed due to a
// growth in the number of network peers matching the prefix, don't delay
// more than s.maxReprovideDelay.
nextReprovideTime = min(nextReprovideTime, s.currentTimeOffset()+s.reprovideInterval+s.maxReprovideDelay)
}
// If schedule contains keys starting with prefix, remove them to avoid
// overlap.
if _, ok := keyspace.FindPrefixOfKey(s.schedule, prefix); ok {
// Already scheduled.
return
}
// Unschedule superstrings in schedule if any.
s.unscheduleSubsumedPrefixesNoLock(prefix)

s.schedule.Add(prefix, nextReprovideTime)

// Check if the prefix that was just added is the next one to be reprovided.
if s.schedule.IsNonEmptyLeaf() {
// The prefix we insterted is the only element in the schedule.
timeUntilPrefixReprovide := s.timeUntil(nextReprovideTime)
s.scheduleNextReprovideNoLock(prefix, timeUntilPrefixReprovide)
return
}
followingKey := keyspace.NextNonEmptyLeaf(s.schedule, prefix, s.order).Key
if followingKey == s.scheduleCursor {
// The key following prefix is the schedule cursor.
timeUntilPrefixReprovide := s.timeUntil(nextReprovideTime)
_, scheduledAlarm := trie.Find(s.schedule, s.scheduleCursor)
if timeUntilPrefixReprovide < s.timeUntil(scheduledAlarm) {
s.scheduleNextReprovideNoLock(prefix, timeUntilPrefixReprovide)
}
}
}

// unscheduleSubsumedPrefixes removes all superstrings of `prefix` that are
// scheduled in the future. Assumes that the schedule lock is held.
func (s *SweepingProvider) unscheduleSubsumedPrefixesNoLock(prefix bitstr.Key) {
// Pop prefixes scheduled in the future being covered by the explored peers.
keyspace.PruneSubtrie(s.schedule, prefix)

// If we removed s.scheduleCursor from schedule, select the next one
if keyspace.IsBitstrPrefix(prefix, s.scheduleCursor) {
next := keyspace.NextNonEmptyLeaf(s.schedule, s.scheduleCursor, s.order)
if next == nil {
s.scheduleNextReprovideNoLock(prefix, s.reprovideInterval)
} else {
timeUntilReprovide := s.timeUntil(next.Data)
s.scheduleNextReprovideNoLock(next.Key, timeUntilReprovide)
logger.Warnf("next scheduled prefix now is %s", s.scheduleCursor)
}
}
}

// currentTimeOffset returns the current time offset in the reprovide cycle.
func (s *SweepingProvider) currentTimeOffset() time.Duration {
return s.timeOffset(s.clock.Now())
}

// timeOffset returns the time offset in the reprovide cycle for the given
// time.
func (s *SweepingProvider) timeOffset(t time.Time) time.Duration {
return t.Sub(s.cycleStart) % s.reprovideInterval
}

// timeUntil returns the time left (duration) until the given time offset.
func (s *SweepingProvider) timeUntil(d time.Duration) time.Duration {
return s.timeBetween(s.currentTimeOffset(), d)
}

// timeBetween returns the duration between the two provided offsets, assuming
// it is no more than s.reprovideInterval.
func (s *SweepingProvider) timeBetween(from, to time.Duration) time.Duration {
return (to-from+s.reprovideInterval-1)%s.reprovideInterval + 1
}

// reprovideTimeForPrefix calculates the scheduled time offset for reproviding
// keys associated with a given prefix based on its bitstring prefix. The
// function maps the given binary prefix to a fraction of the overall reprovide
// interval (s.reprovideInterval), such that keys with prefixes closer to a
// configured target s.order (in XOR distance) are scheduled earlier and those
// further away later in the cycle.
//
// For any prefix of bit length n, the function generates 2^n distinct
// reprovide times that evenly partition the entire reprovide interval. The
// process first truncates s.order to n bits and then XORs it with the provided
// prefix. The resulting binary string is converted to an integer,
// corresponding to the index of the 2^n possible reprovide times to use for
// the prefix.
//
// This method ensures a deterministic and evenly distributed reprovide
// schedule, where the temporal position within the cycle is based on the
// binary representation of the key's prefix.
func (s *SweepingProvider) reprovideTimeForPrefix(prefix bitstr.Key) time.Duration {
if len(prefix) == 0 {
// Empty prefix: all reprovides occur at the beginning of the cycle.
return 0
}
if len(prefix) > maxPrefixSize {
// Truncate the prefix to the maximum allowed size to avoid overly fine
// slicing of time.
prefix = prefix[:maxPrefixSize]
}
// Number of possible bitstrings of the same length as prefix.
maxInt := int64(1 << len(prefix))
// XOR the prefix with the order key to reorder the schedule: keys "close" to
// s.order are scheduled first in the cycle, and those "far" from it are
// scheduled later.
order := bitstr.Key(key.BitString(s.order)[:len(prefix)])
k := prefix.Xor(order)
val, _ := strconv.ParseInt(string(k), 2, 64)
// Calculate the time offset as a fraction of the overall reprovide interval.
return time.Duration(int64(s.reprovideInterval) * val / maxInt)
}

const initialGetClosestPeers = 4

// measureInitialPrefixLen makes a few GetClosestPeers calls to get an estimate
Expand Down
30 changes: 30 additions & 0 deletions provider/provider_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package provider

import (
"bytes"
"context"
"crypto/sha256"
"errors"
"strconv"
"testing"
"time"

"github.com/ipfs/go-test/random"
kb "github.com/libp2p/go-libp2p-kbucket"
Expand Down Expand Up @@ -116,3 +118,31 @@ func TestKeysAllocationsToPeers(t *testing.T) {
}
}
}

func TestReprovideTimeForPrefixWithOrderZero(t *testing.T) {
s := SweepingProvider{
reprovideInterval: 16 * time.Second,
order: bit256.ZeroKey(),
}

require.Equal(t, 0*time.Second, s.reprovideTimeForPrefix("0"))
require.Equal(t, 8*time.Second, s.reprovideTimeForPrefix("1"))
require.Equal(t, 0*time.Second, s.reprovideTimeForPrefix("000"))
require.Equal(t, 8*time.Second, s.reprovideTimeForPrefix("1000"))
require.Equal(t, 10*time.Second, s.reprovideTimeForPrefix("1010"))
require.Equal(t, 15*time.Second, s.reprovideTimeForPrefix("1111"))
}

func TestReprovideTimeForPrefixWithCustomOrder(t *testing.T) {
s := SweepingProvider{
reprovideInterval: 16 * time.Second,
order: bit256.NewKey(bytes.Repeat([]byte{0xFF}, 32)), // 111...1
}

require.Equal(t, 0*time.Second, s.reprovideTimeForPrefix("1"))
require.Equal(t, 8*time.Second, s.reprovideTimeForPrefix("0"))
require.Equal(t, 0*time.Second, s.reprovideTimeForPrefix("111"))
require.Equal(t, 8*time.Second, s.reprovideTimeForPrefix("0111"))
require.Equal(t, 10*time.Second, s.reprovideTimeForPrefix("0101"))
require.Equal(t, 15*time.Second, s.reprovideTimeForPrefix("0000"))
}