diff --git a/provider/provider.go b/provider/provider.go index e48dfa388..3e0b781e6 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -9,13 +9,16 @@ import ( "github.com/filecoin-project/go-clock" logging "github.com/ipfs/go-log/v2" + "github.com/ipfs/go-test/random" kb "github.com/libp2p/go-libp2p-kbucket" "github.com/libp2p/go-libp2p/core/peer" ma "github.com/multiformats/go-multiaddr" mh "github.com/multiformats/go-multihash" "github.com/probe-lab/go-libdht/kad/key" + "github.com/probe-lab/go-libdht/kad/key/bit256" "github.com/probe-lab/go-libdht/kad/key/bitstr" + "github.com/probe-lab/go-libdht/kad/trie" pb "github.com/libp2p/go-libp2p-kad-dht/pb" "github.com/libp2p/go-libp2p-kad-dht/provider/internal/keyspace" @@ -66,13 +69,25 @@ type KadClosestPeersRouter interface { type SweepingProvider struct { // TODO: complete me + done chan struct{} + closeOnce sync.Once + peerid peer.ID + order bit256.Key router KadClosestPeersRouter clock clock.Clock maxProvideConnsPerWorker int + schedule *trie.Trie[bitstr.Key, time.Duration] + + avgPrefixLenLk sync.Mutex + avgPrefixLenReady chan struct{} + cachedAvgPrefixLen int + lastAvgPrefixLen time.Time + avgPrefixLenValidity time.Duration + msgSender pb.MessageSender getSelfAddrs func() []ma.Multiaddr addLocalRecord func(mh.Multihash) error @@ -82,6 +97,115 @@ type SweepingProvider struct { func (s *SweepingProvider) SatisfyLinter() { s.vanillaProvide([]byte{}) s.closestPeersToKey("") + s.measureInitialPrefixLen() + s.getAvgPrefixLenNoLock() +} + +// Close stops the provider and releases all resources. +func (s *SweepingProvider) Close() { + s.closeOnce.Do(func() { close(s.done) }) +} + +func (s *SweepingProvider) closed() bool { + select { + case <-s.done: + return true + default: + return false + } +} + +const initialGetClosestPeers = 4 + +// measureInitialPrefixLen makes a few GetClosestPeers calls to get an estimate +// of the prefix length to be used in the network. +// +// This function blocks until GetClosestPeers succeeds or the provider is +// closed. No provide operation can happen until this function returns. +func (s *SweepingProvider) measureInitialPrefixLen() { + cplSum := atomic.Int32{} + cplSamples := atomic.Int32{} + wg := sync.WaitGroup{} + wg.Add(initialGetClosestPeers) + for range initialGetClosestPeers { + go func() { + defer wg.Done() + randomMh := random.Multihashes(1)[0] + for { + if s.closed() { + return + } + peers, err := s.router.GetClosestPeers(context.Background(), string(randomMh)) + if err != nil { + logger.Infof("GetClosestPeers failed during initial prefix len measurement: %s", err) + } else if len(peers) == 0 { + logger.Info("GetClosestPeers found not peers during initial prefix len measurement") + } else { + if len(peers) <= 2 { + return // Ignore result if only 2 other peers in DHT. + } + cpl := keyspace.KeyLen + firstPeerKey := keyspace.PeerIDToBit256(peers[0]) + for _, p := range peers[1:] { + cpl = min(cpl, key.CommonPrefixLength(firstPeerKey, keyspace.PeerIDToBit256(p))) + } + cplSum.Add(int32(cpl)) + cplSamples.Add(1) + return + } + + s.clock.Sleep(time.Second) // retry every second until success + } + }() + } + wg.Wait() + + nSamples := cplSamples.Load() + s.avgPrefixLenLk.Lock() + defer s.avgPrefixLenLk.Unlock() + if nSamples == 0 { + s.cachedAvgPrefixLen = 0 + } else { + s.cachedAvgPrefixLen = int(cplSum.Load() / nSamples) + } + logger.Debugf("initial avgPrefixLen is %d", s.cachedAvgPrefixLen) + s.lastAvgPrefixLen = s.clock.Now() + close(s.avgPrefixLenReady) +} + +// getAvgPrefixLenNoLock returns the average prefix length of all scheduled +// prefixes. +// +// Hangs until the first measurement is done if the average prefix length is +// missing. +func (s *SweepingProvider) getAvgPrefixLenNoLock() int { + s.avgPrefixLenLk.Lock() + defer s.avgPrefixLenLk.Unlock() + + if s.cachedAvgPrefixLen == -1 { + // Wait for initial measurement to complete. Requires the node to come + // online. + s.avgPrefixLenLk.Unlock() + <-s.avgPrefixLenReady + s.avgPrefixLenLk.Lock() + return s.cachedAvgPrefixLen + } + + if s.lastAvgPrefixLen.Add(s.avgPrefixLenValidity).After(s.clock.Now()) { + // Return cached value if it is still valid. + return s.cachedAvgPrefixLen + } + prefixLenSum := 0 + scheduleSize := s.schedule.Size() + if scheduleSize > 0 { + // Take average prefix length of all scheduled prefixes. + for _, entry := range keyspace.AllEntries(s.schedule, s.order) { + prefixLenSum += len(entry.Key) + } + s.cachedAvgPrefixLen = prefixLenSum / scheduleSize + s.lastAvgPrefixLen = s.clock.Now() + } + return s.cachedAvgPrefixLen } // vanillaProvide provides a single key to the network without any