Skip to content
Merged
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (

"github.com/filecoin-project/go-clock"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-test/random"
kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
mh "github.com/multiformats/go-multihash"

"github.com/probe-lab/go-libdht/kad/key"
"github.com/probe-lab/go-libdht/kad/key/bit256"
"github.com/probe-lab/go-libdht/kad/key/bitstr"
"github.com/probe-lab/go-libdht/kad/trie"

pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kad-dht/provider/internal/keyspace"
Expand Down Expand Up @@ -66,13 +69,25 @@ type KadClosestPeersRouter interface {

type SweepingProvider struct {
// TODO: complete me
done chan struct{}
closeOnce sync.Once

peerid peer.ID
order bit256.Key
router KadClosestPeersRouter

clock clock.Clock

maxProvideConnsPerWorker int

schedule *trie.Trie[bitstr.Key, time.Duration]

avgPrefixLenLk sync.Mutex
avgPrefixLenReady chan struct{}
cachedAvgPrefixLen int
lastAvgPrefixLen time.Time
avgPrefixLenValidity time.Duration

msgSender pb.MessageSender
getSelfAddrs func() []ma.Multiaddr
addLocalRecord func(mh.Multihash) error
Expand All @@ -82,6 +97,115 @@ type SweepingProvider struct {
func (s *SweepingProvider) SatisfyLinter() {
s.vanillaProvide([]byte{})
s.closestPeersToKey("")
s.measureInitialPrefixLen()
s.getAvgPrefixLenNoLock()
}

// Close stops the provider and releases all resources.
func (s *SweepingProvider) Close() {
s.closeOnce.Do(func() { close(s.done) })
}

func (s *SweepingProvider) closed() bool {
select {
case <-s.done:
return true
default:
return false
}
}

const initialGetClosestPeers = 4

// measureInitialPrefixLen makes a few GetClosestPeers calls to get an estimate
// of the prefix length to be used in the network.
//
// This function blocks until GetClosestPeers succeeds or the provider is
// closed. No provide operation can happen until this function returns.
func (s *SweepingProvider) measureInitialPrefixLen() {
cplSum := atomic.Int32{}
cplSamples := atomic.Int32{}
wg := sync.WaitGroup{}
wg.Add(initialGetClosestPeers)
for range initialGetClosestPeers {
go func() {
defer wg.Done()
randomMh := random.Multihashes(1)[0]
for {
if s.closed() {
return
}
peers, err := s.router.GetClosestPeers(context.Background(), string(randomMh))
if err != nil {
logger.Infof("GetClosestPeers failed during initial prefix len measurement: %s", err)
} else if len(peers) == 0 {
logger.Info("GetClosestPeers found not peers during initial prefix len measurement")
} else {
if len(peers) <= 2 {
return // Ignore result if only 2 other peers in DHT.
}
cpl := keyspace.KeyLen
firstPeerKey := keyspace.PeerIDToBit256(peers[0])
for _, p := range peers[1:] {
cpl = min(cpl, key.CommonPrefixLength(firstPeerKey, keyspace.PeerIDToBit256(p)))
}
cplSum.Add(int32(cpl))
cplSamples.Add(1)
return
}

s.clock.Sleep(time.Second) // retry every second until success
}
}()
}
wg.Wait()

nSamples := cplSamples.Load()
s.avgPrefixLenLk.Lock()
defer s.avgPrefixLenLk.Unlock()
if nSamples == 0 {
s.cachedAvgPrefixLen = 0
} else {
s.cachedAvgPrefixLen = int(cplSum.Load() / nSamples)
}
logger.Debugf("initial avgPrefixLen is %d", s.cachedAvgPrefixLen)
s.lastAvgPrefixLen = s.clock.Now()
s.avgPrefixLenReady <- struct{}{}
}

// getAvgPrefixLenNoLock returns the average prefix length of all scheduled
// prefixes.
//
// Hangs until the first measurement is done if the average prefix length is
// missing.
func (s *SweepingProvider) getAvgPrefixLenNoLock() int {
s.avgPrefixLenLk.Lock()
defer s.avgPrefixLenLk.Unlock()

if s.cachedAvgPrefixLen == -1 {
// Wait for initial measurement to complete. Requires the node to come
// online.
s.avgPrefixLenLk.Unlock()
<-s.avgPrefixLenReady
s.avgPrefixLenLk.Lock()
return s.cachedAvgPrefixLen
}

if s.lastAvgPrefixLen.Add(s.avgPrefixLenValidity).After(s.clock.Now()) {
// Return cached value if it is still valid.
return s.cachedAvgPrefixLen
}
prefixLenSum := 0
scheduleSize := s.schedule.Size()
if scheduleSize > 0 {
// Take average prefix length of all scheduled prefixes.
for _, entry := range keyspace.AllEntries(s.schedule, s.order) {
prefixLenSum += len(entry.Key)
}
s.cachedAvgPrefixLen = prefixLenSum / scheduleSize
s.lastAvgPrefixLen = s.clock.Now()
}
return s.cachedAvgPrefixLen
}

// vanillaProvide provides a single key to the network without any
Expand Down
Loading