diff --git a/provider/provider.go b/provider/provider.go index 8ed30b2e2..87620dfac 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -897,7 +897,7 @@ func (s *SweepingProvider) handleReprovide() { s.wg.Add(1) go func() { - if err := s.workerPool.Acquire(periodicWorker); err != nil { + if err := s.workerPool.Acquire(periodicWorker); err == nil { s.batchReprovide(currentPrefix, true) s.workerPool.Release(periodicWorker) } @@ -1319,7 +1319,6 @@ func (s *SweepingProvider) provideRegions(regions []keyspace.Region, addrInfo pe continue } s.provideCounter.Add(s.ctx, int64(len(allKeys))) - } // If at least 1 regions was provided, we don't consider it a failure. return errCount < len(regions) diff --git a/provider/provider_test.go b/provider/provider_test.go index 51c3564a0..c43deb0b9 100644 --- a/provider/provider_test.go +++ b/provider/provider_test.go @@ -25,6 +25,7 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" + "github.com/probe-lab/go-libdht/kad/key" "github.com/probe-lab/go-libdht/kad/key/bit256" "github.com/probe-lab/go-libdht/kad/key/bitstr" "github.com/probe-lab/go-libdht/kad/trie" @@ -51,6 +52,30 @@ func genMultihashes(n int) []mh.Multihash { return mhs } +// genBalancedMultihashes generates 2^exponent multihashes, with balanced +// prefixes, in a random order. +// +// e.g genBalancedMultihashes(3) will generate 8 multihashes, with each +// kademlia identifier starting with a distinct prefix (000, 001, 010, ..., +// 111) of len 3. +func genBalancedMultihashes(exponent int) []mh.Multihash { + n := 1 << exponent + mhs := make([]mh.Multihash, 0, n) + seen := make(map[bitstr.Key]struct{}, n) + for i := 0; len(mhs) < n; i++ { + h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) + if err != nil { + panic(err) + } + prefix := bitstr.Key(key.BitString(keyspace.MhToBit256(h))[:exponent]) + if _, ok := seen[prefix]; !ok { + mhs = append(mhs, h) + seen[prefix] = struct{}{} + } + } + return mhs +} + func genMultihashesMatchingPrefix(prefix bitstr.Key, n int) []mh.Multihash { mhs := make([]mh.Multihash, 0, n) for i := 0; len(mhs) < n; i++ { @@ -727,3 +752,409 @@ func TestClose(t *testing.T) { err = prov.workerPool.Acquire(burstWorker) require.ErrorIs(t, err, reservedpool.ErrClosed) } + +func TestProvideOnce(t *testing.T) { + pid, err := peer.Decode("12BoooooPEER") + require.NoError(t, err) + + online := atomic.Bool{} + online.Store(true) // start online + router := &mockRouter{ + getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { + if online.Load() { + return []peer.ID{pid}, nil + } + return nil, errors.New("offline") + }, + } + provideCount := atomic.Int32{} + msgSender := &mockMsgSender{ + sendMessageFunc: func(ctx context.Context, p peer.ID, m *pb.Message) error { + provideCount.Add(1) + return nil + }, + } + clk := clock.NewMock() + + checkInterval := time.Minute + + opts := []Option{ + WithPeerID(pid), + WithRouter(router), + WithMessageSender(msgSender), + WithSelfAddrs(func() []ma.Multiaddr { + addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") + require.NoError(t, err) + return []ma.Multiaddr{addr} + }), + WithClock(clk), + WithConnectivityCheckOnlineInterval(checkInterval), + WithConnectivityCheckOfflineInterval(checkInterval), + } + reprovider, err := New(opts...) + require.NoError(t, err) + defer reprovider.Close() + reprovider.avgPrefixLenLk.Lock() + reprovider.cachedAvgPrefixLen = 4 + reprovider.avgPrefixLenLk.Unlock() + + h := genMultihashes(1)[0] + + // Set the reprovider as offline + online.Store(false) + reprovider.connectivity.TriggerCheck() + time.Sleep(5 * time.Millisecond) // wait for connectivity check to finish + reprovider.ProvideOnce(h) + time.Sleep(5 * time.Millisecond) // wait for ProvideOnce to finish + require.Equal(t, int32(0), provideCount.Load(), "should not have provided when offline") + // Ensure the key is in the provide queue + _, keys, ok := reprovider.provideQueue.Dequeue() + require.True(t, ok) + require.Equal(t, 1, len(keys)) + require.Equal(t, h, keys[0]) + + // Set the reprovider as online + online.Store(true) + clk.Add(checkInterval) // trigger connectivity check + time.Sleep(5 * time.Millisecond) // wait for connectivity check to finish + reprovider.ProvideOnce(h) + waitUntil(t, func() bool { return provideCount.Load() == 1 }, 100*time.Millisecond, "waiting for ProvideOnce to finish") +} + +func TestStartProvidingSingle(t *testing.T) { + pid, err := peer.Decode("12BoooooPEER") + require.NoError(t, err) + replicationFactor := 4 + h := genMultihashes(1)[0] + + mockClock := clock.NewMock() + reprovideInterval := time.Hour + + prefixLen := 4 + peers := make([]peer.ID, replicationFactor) + peers[0], err = peer.Decode("12BooooPEER1") + require.NoError(t, err) + kbKey := keyspace.KeyToBytes(keyspace.PeerIDToBit256(peers[0])) + for i := range peers[1:] { + peers[i+1], err = kb.GenRandPeerIDWithCPL(kbKey, uint(prefixLen)) + require.NoError(t, err) + } + + getClosestPeersCount := atomic.Int32{} + router := &mockRouter{ + getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { + getClosestPeersCount.Add(1) + return peers, nil + }, + } + provideCount := atomic.Int32{} + msgSender := &mockMsgSender{ + sendMessageFunc: func(ctx context.Context, p peer.ID, m *pb.Message) error { + provideCount.Add(1) + return nil + }, + } + opts := []Option{ + WithReplicationFactor(replicationFactor), + WithReprovideInterval(reprovideInterval), + WithPeerID(pid), + WithRouter(router), + WithMessageSender(msgSender), + WithSelfAddrs(func() []ma.Multiaddr { + addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") + require.NoError(t, err) + return []ma.Multiaddr{addr} + }), + WithClock(mockClock), + } + reprovider, err := New(opts...) + require.NoError(t, err) + defer reprovider.Close() + + // Blocks until key is provided + reprovider.StartProviding(true, h) + waitUntil(t, func() bool { return provideCount.Load() == int32(len(peers)) }, 100*time.Millisecond, "waiting for ProvideOnce to finish") + require.Equal(t, 1+initialGetClosestPeersCount, int(getClosestPeersCount.Load())) + + // Verify reprovide is scheduled. + prefix := bitstr.Key(key.BitString(keyspace.MhToBit256(h))[:prefixLen]) + reprovider.scheduleLk.Lock() + require.Equal(t, 1, reprovider.schedule.Size()) + found, reprovideTime := trie.Find(reprovider.schedule, prefix) + if !found { + t.Log(prefix) + t.Log(keyspace.AllEntries(reprovider.schedule, reprovider.order)[0].Key) + t.Fatal("prefix not inserted in schedule") + } + require.Equal(t, reprovider.reprovideTimeForPrefix(prefix), reprovideTime) + reprovider.scheduleLk.Unlock() + + // Try to provide the same key again -> noop + reprovider.StartProviding(false, h) + time.Sleep(5 * time.Millisecond) + require.Equal(t, int32(len(peers)), provideCount.Load()) + require.Equal(t, 1+initialGetClosestPeersCount, int(getClosestPeersCount.Load())) + + // Verify reprovide happens as scheduled. + mockClock.Add(reprovideTime - 1) + require.Equal(t, 1+initialGetClosestPeersCount, int(getClosestPeersCount.Load())) + require.Equal(t, int32(len(peers)), provideCount.Load()) + mockClock.Add(1) + waitUntil(t, func() bool { return provideCount.Load() == 2*int32(len(peers)) }, 200*time.Millisecond, "waiting for reprovide to finish 0") + require.Equal(t, 2+initialGetClosestPeersCount, int(getClosestPeersCount.Load())) + mockClock.Add(reprovideInterval - 1) + require.Equal(t, 2+initialGetClosestPeersCount, int(getClosestPeersCount.Load())) + require.Equal(t, 2*int32(len(peers)), provideCount.Load()) + mockClock.Add(reprovideInterval) // 1 + waitUntil(t, func() bool { return provideCount.Load() == 3*int32(len(peers)) }, 200*time.Millisecond, "waiting for reprovide to finish 1") + require.Equal(t, 3+initialGetClosestPeersCount, int(getClosestPeersCount.Load())) +} + +const bitsPerByte = 8 + +func TestStartProvidingMany(t *testing.T) { + pid, err := peer.Decode("12BoooooPEER") + require.NoError(t, err) + + nKeysExponent := 10 + nKeys := 1 << nKeysExponent + mhs := genBalancedMultihashes(nKeysExponent) + + replicationFactor := 4 + peerPrefixBitlen := 6 + require.LessOrEqual(t, peerPrefixBitlen, bitsPerByte) + var nPeers byte = 1 << peerPrefixBitlen // 2**peerPrefixBitlen + peers := make([]peer.ID, nPeers) + for i := range nPeers { + b := i << (bitsPerByte - peerPrefixBitlen) + k := [32]byte{b} + peers[i], err = kb.GenRandPeerIDWithCPL(k[:], uint(peerPrefixBitlen)) + require.NoError(t, err) + } + + mockClock := clock.NewMock() + reprovideInterval := time.Hour + + router := &mockRouter{ + getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { + sortedPeers := kb.SortClosestPeers(peers, kb.ConvertKey(k)) + return sortedPeers[:min(replicationFactor, len(peers))], nil + }, + } + msgSenderLk := sync.Mutex{} + addProviderRpcs := make(map[string]map[peer.ID]int) // key -> peerid -> count + provideCount := atomic.Int32{} + msgSender := &mockMsgSender{ + sendMessageFunc: func(ctx context.Context, p peer.ID, m *pb.Message) error { + msgSenderLk.Lock() + defer msgSenderLk.Unlock() + _, k, err := mh.MHFromBytes(m.GetKey()) + require.NoError(t, err) + if _, ok := addProviderRpcs[string(k)]; !ok { + addProviderRpcs[string(k)] = make(map[peer.ID]int) + } + addProviderRpcs[string(k)][p]++ + provideCount.Add(1) + return nil + }, + } + opts := []Option{ + WithReprovideInterval(reprovideInterval), + WithReplicationFactor(replicationFactor), + WithMaxWorkers(1), + WithDedicatedBurstWorkers(0), + WithDedicatedPeriodicWorkers(0), + WithPeerID(pid), + WithRouter(router), + WithMessageSender(msgSender), + WithSelfAddrs(func() []ma.Multiaddr { + addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") + require.NoError(t, err) + return []ma.Multiaddr{addr} + }), + WithClock(mockClock), + } + reprovider, err := New(opts...) + require.NoError(t, err) + defer reprovider.Close() + + reprovider.StartProviding(true, mhs...) + waitUntil(t, func() bool { return provideCount.Load() == int32(len(mhs)*replicationFactor) }, 100*time.Millisecond, "waiting for ProvideMany to finish") + + // Each key should have been provided at least once. + msgSenderLk.Lock() + require.Equal(t, nKeys, len(addProviderRpcs)) + for k, holders := range addProviderRpcs { + // Verify that all keys have been provided to exactly replicationFactor + // distinct peers. + require.Len(t, holders, replicationFactor) + for _, count := range holders { + require.Equal(t, 1, count) + } + // Verify provider records are assigned to the closest peers + closestPeers := kb.SortClosestPeers(peers, kb.ConvertKey(k))[:replicationFactor] + for _, p := range closestPeers { + require.Contains(t, holders, p) + } + } + + step := 10 * time.Second + // Test reprovides, clear addProviderRpcs + clear(addProviderRpcs) + msgSenderLk.Unlock() + for range reprovideInterval / step { + mockClock.Add(step) + } + waitUntil(t, func() bool { return provideCount.Load() == 2*int32(len(mhs)*replicationFactor) }, 200*time.Millisecond, "waiting for reprovide to finish 0") + + msgSenderLk.Lock() + require.Equal(t, nKeys, len(addProviderRpcs)) + for k, holders := range addProviderRpcs { + // Verify that all keys have been provided to exactly replicationFactor + // distinct peers. + require.Len(t, holders, replicationFactor, key.BitString(keyspace.MhToBit256([]byte(k)))) + for _, count := range holders { + require.Equal(t, 1, count) + } + // Verify provider records are assigned to the closest peers + closestPeers := kb.SortClosestPeers(peers, kb.ConvertKey(k))[:replicationFactor] + for _, p := range closestPeers { + require.Contains(t, holders, p) + } + } + + // Test reprovides again, clear addProviderRpcs + clear(addProviderRpcs) + msgSenderLk.Unlock() + for range reprovideInterval / step { + mockClock.Add(step) + } + waitUntil(t, func() bool { return provideCount.Load() == 3*int32(len(mhs)*replicationFactor) }, 200*time.Millisecond, "waiting for reprovide to finish 1") + + msgSenderLk.Lock() + require.Equal(t, nKeys, len(addProviderRpcs)) + for k, holders := range addProviderRpcs { + // Verify that all keys have been provided to exactly replicationFactor + // distinct peers. + require.Len(t, holders, replicationFactor) + for _, count := range holders { + require.Equal(t, 1, count) + } + // Verify provider records are assigned to the closest peers + closestPeers := kb.SortClosestPeers(peers, kb.ConvertKey(k))[:replicationFactor] + for _, p := range closestPeers { + require.Contains(t, holders, p) + } + } + msgSenderLk.Unlock() +} + +func TestStartProvidingUnstableNetwork(t *testing.T) { + pid, err := peer.Decode("12BoooooPEER") + require.NoError(t, err) + + nKeysExponent := 10 + nKeys := 1 << nKeysExponent + mhs := genBalancedMultihashes(nKeysExponent) + + replicationFactor := 4 + peerPrefixBitlen := 6 + require.LessOrEqual(t, peerPrefixBitlen, bitsPerByte) + var nPeers byte = 1 << peerPrefixBitlen // 2**peerPrefixBitlen + peers := make([]peer.ID, nPeers) + for i := range nPeers { + b := i << (bitsPerByte - peerPrefixBitlen) + k := [32]byte{b} + peers[i], err = kb.GenRandPeerIDWithCPL(k[:], uint(peerPrefixBitlen)) + require.NoError(t, err) + } + + mockClock := clock.NewMock() + reprovideInterval := time.Hour + connectivityCheckInterval := time.Second + + routerOffline := atomic.Bool{} + router := &mockRouter{ + getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { + if routerOffline.Load() { + return nil, errors.New("offline") + } + sortedPeers := kb.SortClosestPeers(peers, kb.ConvertKey(k)) + return sortedPeers[:min(replicationFactor, len(peers))], nil + }, + } + msgSenderLk := sync.Mutex{} + addProviderRpcs := make(map[string]map[peer.ID]int) // key -> peerid -> count + provideCount := atomic.Int32{} + msgSender := &mockMsgSender{ + sendMessageFunc: func(ctx context.Context, p peer.ID, m *pb.Message) error { + msgSenderLk.Lock() + defer msgSenderLk.Unlock() + if routerOffline.Load() { + return errors.New("offline") + } + _, k, err := mh.MHFromBytes(m.GetKey()) + require.NoError(t, err) + if _, ok := addProviderRpcs[string(k)]; !ok { + addProviderRpcs[string(k)] = make(map[peer.ID]int) + } + addProviderRpcs[string(k)][p]++ + provideCount.Add(1) + return nil + }, + } + opts := []Option{ + WithReprovideInterval(reprovideInterval), + WithReplicationFactor(replicationFactor), + WithMaxWorkers(1), + WithDedicatedBurstWorkers(0), + WithDedicatedPeriodicWorkers(0), + WithPeerID(pid), + WithRouter(router), + WithMessageSender(msgSender), + WithSelfAddrs(func() []ma.Multiaddr { + addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") + require.NoError(t, err) + return []ma.Multiaddr{addr} + }), + WithClock(mockClock), + WithConnectivityCheckOnlineInterval(connectivityCheckInterval), + WithConnectivityCheckOfflineInterval(connectivityCheckInterval), + } + reprovider, err := New(opts...) + require.NoError(t, err) + defer reprovider.Close() + time.Sleep(10 * time.Millisecond) + routerOffline.Store(true) + + reprovider.StartProviding(true, mhs...) + time.Sleep(10 * time.Millisecond) // wait for StartProviding to finish + require.Equal(t, int32(0), provideCount.Load(), "should not have provided when offline") + + nodeOffline := func() bool { + return !reprovider.connectivity.IsOnline() + } + waitUntil(t, nodeOffline, 100*time.Millisecond, "waiting for node to be offline") + mockClock.Add(connectivityCheckInterval) + + routerOffline.Store(false) + mockClock.Add(connectivityCheckInterval) + waitUntil(t, reprovider.connectivity.IsOnline, 100*time.Millisecond, "waiting for node to come back online") + + providedAllKeys := func() bool { + msgSenderLk.Lock() + defer msgSenderLk.Unlock() + if len(addProviderRpcs) != nKeys { + return false + } + for _, peers := range addProviderRpcs { + // Verify that all keys have been provided to exactly replicationFactor + // distinct peers. + if len(peers) != replicationFactor { + return false + } + } + return true + } + waitUntil(t, providedAllKeys, 200*time.Millisecond, "waiting for all keys to be reprovided") +}