diff --git a/.github/workflows/go-test.yml b/.github/workflows/go-test.yml index 47531a8f3..778de6ed4 100644 --- a/.github/workflows/go-test.yml +++ b/.github/workflows/go-test.yml @@ -16,7 +16,5 @@ concurrency: jobs: go-test: uses: ipdxco/unified-github-workflows/.github/workflows/go-test.yml@v1.0 - with: - go-versions: '["1.23.x", "1.24.x"]' secrets: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/go.mod b/go.mod index 104ee90ab..5ee0d2bea 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,8 @@ module github.com/libp2p/go-libp2p-kad-dht -go 1.23.10 +go 1.24 require ( - github.com/filecoin-project/go-clock v0.1.0 github.com/gammazero/deque v1.0.0 github.com/google/gopacket v1.1.19 github.com/google/uuid v1.6.0 @@ -48,6 +47,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect + github.com/filecoin-project/go-clock v0.1.0 // indirect github.com/flynn/noise v1.1.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect github.com/go-logr/logr v1.4.3 // indirect diff --git a/provider/internal/connectivity/connectivity.go b/provider/internal/connectivity/connectivity.go index 63391c73d..495b9a32a 100644 --- a/provider/internal/connectivity/connectivity.go +++ b/provider/internal/connectivity/connectivity.go @@ -4,8 +4,6 @@ import ( "sync" "sync/atomic" "time" - - "github.com/filecoin-project/go-clock" ) const ( @@ -43,7 +41,6 @@ type ConnectivityChecker struct { online atomic.Bool - clock clock.Clock lastCheck time.Time onlineCheckInterval time.Duration // minimum check interval when online @@ -64,7 +61,6 @@ func New(checkFunc func() bool, opts ...Option) (*ConnectivityChecker, error) { c := &ConnectivityChecker{ done: make(chan struct{}), checkFunc: checkFunc, - clock: cfg.clock, onlineCheckInterval: cfg.onlineCheckInterval, onOffline: cfg.onOffline, onOnline: cfg.onOnline, @@ -139,7 +135,7 @@ func (c *ConnectivityChecker) TriggerCheck() { c.mutex.Unlock() return } - if c.online.Load() && c.clock.Now().Sub(c.lastCheck) < c.onlineCheckInterval { + if c.online.Load() && time.Since(c.lastCheck) < c.onlineCheckInterval { c.mutex.Unlock() return // last check was too recent } @@ -148,7 +144,7 @@ func (c *ConnectivityChecker) TriggerCheck() { defer c.mutex.Unlock() if c.checkFunc() { - c.lastCheck = c.clock.Now() + c.lastCheck = time.Now() return } @@ -165,13 +161,20 @@ func (c *ConnectivityChecker) TriggerCheck() { func (c *ConnectivityChecker) probeLoop(init bool) { var offlineC <-chan time.Time if !init { - offlineTimer := c.clock.Timer(c.offlineDelay) - defer offlineTimer.Stop() - offlineC = offlineTimer.C + if c.offlineDelay == 0 { + if c.onOffline != nil { + // Online -> Offline + c.onOffline() + } + } else { + offlineTimer := time.NewTimer(c.offlineDelay) + defer offlineTimer.Stop() + offlineC = offlineTimer.C + } } delay := initialBackoffDelay - timer := c.clock.Timer(delay) + timer := time.NewTimer(delay) defer timer.Stop() for { select { @@ -202,7 +205,7 @@ func (c *ConnectivityChecker) probe() bool { // Node is back Online. c.online.Store(true) - c.lastCheck = c.clock.Now() + c.lastCheck = time.Now() if c.onOnline != nil { c.onOnline() } diff --git a/provider/internal/connectivity/connectivity_test.go b/provider/internal/connectivity/connectivity_test.go index c7417a1a4..c30de2f2c 100644 --- a/provider/internal/connectivity/connectivity_test.go +++ b/provider/internal/connectivity/connectivity_test.go @@ -1,676 +1,409 @@ +//go:build go1.25 +// +build go1.25 + package connectivity import ( - "sync" "sync/atomic" "testing" + "testing/synctest" "time" - "github.com/filecoin-project/go-clock" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func eventually(t *testing.T, condition func() bool, maxDelay time.Duration, args ...any) { - step := time.Millisecond - for range maxDelay / step { - if condition() { - return - } - time.Sleep(step) - } - t.Fatal(args...) -} - -func TestConnectivityChecker_New(t *testing.T) { - t.Parallel() - - t.Run("starts offline when checkFunc returns false", func(t *testing.T) { - checkFunc := func() bool { return false } +var ( + onlineCheckFunc = func() bool { return true } + offlineCheckFunc = func() bool { return false } +) - checker, err := New(checkFunc) +func TestNewConnectiviyChecker(t *testing.T) { + t.Run("initial state is offline", func(t *testing.T) { + connChecker, err := New(onlineCheckFunc) require.NoError(t, err) - defer checker.Close() - - // Give some time for initialization - eventually(t, func() bool { return !checker.IsOnline() }, 20*time.Millisecond, "checker should be offline") + defer connChecker.Close() - assert.False(t, checker.IsOnline()) + require.False(t, connChecker.IsOnline()) }) - t.Run("starts online when checkFunc returns true", func(t *testing.T) { - checkFunc := func() bool { return true } + t.Run("start online", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + onlineChan := make(chan struct{}) + onOnline := func() { close(onlineChan) } - checker, err := New(checkFunc) - require.NoError(t, err) - checker.Start() - defer checker.Close() + connChecker, err := New(onlineCheckFunc, + WithOnOnline(onOnline), + ) + require.NoError(t, err) + defer connChecker.Close() - // Give some time for initialization - eventually(t, checker.IsOnline, 20*time.Millisecond, "checker should be online") - - require.True(t, checker.IsOnline()) - }) + require.False(t, connChecker.IsOnline()) - t.Run("with custom options", func(t *testing.T) { - mockClock := clock.NewMock() - checkFunc := func() bool { return true } + connChecker.Start() - checker, err := New(checkFunc, - WithClock(mockClock), - WithOnlineCheckInterval(30*time.Second), - ) - require.NoError(t, err) - defer checker.Close() + <-onlineChan // wait for onOnline to be run + synctest.Wait() - assert.NotNil(t, checker) + require.True(t, connChecker.IsOnline()) + }) }) - t.Run("with invalid options", func(t *testing.T) { - checkFunc := func() bool { return true } - - _, err := New(checkFunc, - WithOnlineCheckInterval(-1*time.Second), - ) - assert.Error(t, err) + t.Run("start offline", func(t *testing.T) { + onlineCount, offlineCount := atomic.Int32{}, atomic.Int32{} + onOnline := func() { onlineCount.Add(1) } + onOffline := func() { offlineCount.Add(1) } - _, err = New(checkFunc, - WithOfflineDelay(-1*time.Second), + connChecker, err := New(offlineCheckFunc, + WithOnOnline(onOnline), + WithOnOffline(onOffline), ) - assert.Error(t, err) - }) -} - -func TestConnectivityChecker_Close(t *testing.T) { - t.Parallel() - - t.Run("close stops all operations", func(t *testing.T) { - checkFunc := func() bool { return false } - - checker, err := New(checkFunc) require.NoError(t, err) + defer connChecker.Close() - err = checker.Close() - assert.NoError(t, err) + require.False(t, connChecker.IsOnline()) - // Multiple closes should not panic - err = checker.Close() - assert.NoError(t, err) - }) - - t.Run("operations after close are ignored", func(t *testing.T) { - checkFunc := func() bool { return true } - - checker, err := New(checkFunc) - require.NoError(t, err) - - checker.Close() + connChecker.Start() - // TriggerCheck should be ignored after close - checker.TriggerCheck() + require.False(t, connChecker.mutex.TryLock()) // node probing until it comes online - // State should still be accessible - _ = checker.IsOnline() + require.False(t, connChecker.IsOnline()) + require.Equal(t, int32(0), onlineCount.Load()) + require.Equal(t, int32(0), offlineCount.Load()) }) } -func TestConnectivityChecker_TriggerCheck_WithMockClock(t *testing.T) { - t.Parallel() - - t.Run("ignores check when interval not passed", func(t *testing.T) { - mockClock := clock.NewMock() - var checkCount atomic.Int32 - - checkFunc := func() bool { - checkCount.Add(1) - return true - } - - checker, err := New(checkFunc, - WithClock(mockClock), - WithOnlineCheckInterval(1*time.Minute), - ) - require.NoError(t, err) - checker.Start() - defer checker.Close() - - // Wait for initial check - time.Sleep(10 * time.Millisecond) - - initialCount := checkCount.Load() - - // Trigger check immediately - should be ignored - checker.TriggerCheck() - time.Sleep(10 * time.Millisecond) - - assert.Equal(t, initialCount, checkCount.Load()) - - // Advance clock and trigger check - should work - mockClock.Add(2 * time.Minute) - checker.TriggerCheck() - time.Sleep(10 * time.Millisecond) - - assert.Greater(t, checkCount.Load(), initialCount) +func TestStateTransitions(t *testing.T) { + t.Run("offline to online", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + checkInterval := time.Second + offlineDelay := time.Minute + + online := atomic.Bool{} // start offline + checkFunc := func() bool { return online.Load() } + + onlineChan, offlineChan := make(chan struct{}), make(chan struct{}) + onOnline := func() { close(onlineChan) } + onOffline := func() { close(offlineChan) } + + connChecker, err := New(checkFunc, + WithOfflineDelay(offlineDelay), + WithOnlineCheckInterval(checkInterval), + WithOnOnline(onOnline), + WithOnOffline(onOffline), + ) + require.NoError(t, err) + defer connChecker.Close() + + require.False(t, connChecker.IsOnline()) + connChecker.Start() + + time.Sleep(initialBackoffDelay) + + online.Store(true) + + <-onlineChan // wait for onOnline to be run + require.True(t, connChecker.IsOnline()) + select { + case <-offlineChan: + require.FailNow(t, "onOffline shouldn't have been called") + default: + } + }) }) - t.Run("transitions from online to offline", func(t *testing.T) { - var isOnline atomic.Bool - isOnline.Store(true) - - checkFunc := func() bool { - return isOnline.Load() - } - - var onlineCallCount, offlineCallCount atomic.Int32 - onlineNotify := func() { onlineCallCount.Add(1) } - offlineNotify := func() { offlineCallCount.Add(1) } - - checker, err := New(checkFunc, - WithOnlineCheckInterval(10*time.Millisecond), // Very short interval - WithOfflineDelay(100*time.Millisecond), // Short delay for testing - WithOnOnline(onlineNotify), - WithOnOffline(offlineNotify), - ) - require.NoError(t, err) - checker.Start() - defer checker.Close() - - // Wait for initialization - should be online - time.Sleep(30 * time.Millisecond) - assert.True(t, checker.IsOnline()) - - // Make checkFunc return false and trigger check - isOnline.Store(false) - - // Wait for online check interval to pass before triggering - time.Sleep(20 * time.Millisecond) - checker.TriggerCheck() - time.Sleep(30 * time.Millisecond) - - // Should be in Disconnected state - require.False(t, checker.IsOnline()) - assert.Equal(t, offlineCallCount.Load(), int32(0)) - - // Wait for offline delay to pass - time.Sleep(150 * time.Millisecond) - - // Should be offline and callback should be called - require.False(t, checker.IsOnline()) - assert.Greater(t, offlineCallCount.Load(), int32(0)) + t.Run("online to disconnected to offline", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + checkInterval := time.Second + offlineDelay := time.Minute + + online := atomic.Bool{} + online.Store(true) + checkFunc := func() bool { return online.Load() } + + onlineChan, offlineChan := make(chan struct{}), make(chan struct{}) + onOnline := func() { close(onlineChan) } + onOffline := func() { close(offlineChan) } + + connChecker, err := New(checkFunc, + WithOfflineDelay(offlineDelay), + WithOnlineCheckInterval(checkInterval), + WithOnOnline(onOnline), + WithOnOffline(onOffline), + ) + require.NoError(t, err) + defer connChecker.Close() + + require.False(t, connChecker.IsOnline()) + connChecker.Start() + + <-onlineChan // wait for onOnline to be run + require.True(t, connChecker.IsOnline()) + require.Equal(t, time.Now(), connChecker.lastCheck) + + online.Store(false) + // Cannot trigger check yet + connChecker.TriggerCheck() + require.True(t, connChecker.mutex.TryLock()) // node still online + connChecker.mutex.Unlock() + + time.Sleep(checkInterval - time.Millisecond) + connChecker.TriggerCheck() + require.True(t, connChecker.mutex.TryLock()) // node still online + connChecker.mutex.Unlock() + + time.Sleep(time.Millisecond) + connChecker.TriggerCheck() + require.False(t, connChecker.mutex.TryLock()) + + synctest.Wait() + + require.False(t, connChecker.IsOnline()) + select { + case <-offlineChan: + require.FailNow(t, "onOffline shouldn't have been called") + default: // Disconnected but not Offline + } + + connChecker.TriggerCheck() // noop since Disconnected + require.False(t, connChecker.mutex.TryLock()) + + time.Sleep(offlineDelay) + + require.False(t, connChecker.IsOnline()) + <-offlineChan // wait for callback to be run + + connChecker.TriggerCheck() // noop since Offline + require.False(t, connChecker.mutex.TryLock()) + }) }) - t.Run("transitions from offline to online", func(t *testing.T) { - mockClock := clock.NewMock() - var isOnline atomic.Bool - isOnline.Store(false) - - checkFunc := func() bool { - return isOnline.Load() - } - - var onlineCallCount atomic.Int32 - onlineNotify := func() { onlineCallCount.Add(1) } - - checker, err := New(checkFunc, - WithClock(mockClock), - WithOnOnline(onlineNotify), - ) - require.NoError(t, err) - checker.Start() - defer checker.Close() - - // Wait for initialization - time.Sleep(10 * time.Millisecond) - assert.False(t, checker.IsOnline()) - - // Make checkFunc return true - isOnline.Store(true) - - // Advance clock to trigger offline check - mockClock.Add(1 * time.Minute) - time.Sleep(50 * time.Millisecond) - - // Should be online and callback should be called - assert.True(t, checker.IsOnline()) - assert.Greater(t, onlineCallCount.Load(), int32(0)) + t.Run("remain online", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + checkInterval := time.Second + offlineDelay := time.Minute + + online := atomic.Bool{} + online.Store(true) + checkCount := atomic.Int32{} + checkFunc := func() bool { checkCount.Add(1); return online.Load() } + + onlineChan, offlineChan := make(chan struct{}), make(chan struct{}) + onOnline := func() { close(onlineChan) } + onOffline := func() { close(offlineChan) } + + connChecker, err := New(checkFunc, + WithOfflineDelay(offlineDelay), + WithOnlineCheckInterval(checkInterval), + WithOnOnline(onOnline), + WithOnOffline(onOffline), + ) + require.NoError(t, err) + defer connChecker.Close() + + require.False(t, connChecker.IsOnline()) + connChecker.Start() + + <-onlineChan + + require.True(t, connChecker.IsOnline()) + require.Equal(t, int32(1), checkCount.Load()) + require.Equal(t, time.Now(), connChecker.lastCheck) + + connChecker.TriggerCheck() // recent check, should be no-op + synctest.Wait() + require.Equal(t, int32(1), checkCount.Load()) + + time.Sleep(checkInterval - 1) + connChecker.TriggerCheck() // recent check, should be no-op + synctest.Wait() + require.Equal(t, int32(1), checkCount.Load()) + + time.Sleep(1) + connChecker.TriggerCheck() // checkInterval has passed, new check is run + synctest.Wait() + require.Equal(t, int32(2), checkCount.Load()) + require.Equal(t, time.Now(), connChecker.lastCheck) + + time.Sleep(checkInterval) + connChecker.TriggerCheck() // checkInterval has passed, new check is run + synctest.Wait() + require.Equal(t, int32(3), checkCount.Load()) + require.Equal(t, time.Now(), connChecker.lastCheck) + }) }) +} - t.Run("concurrent trigger checks", func(t *testing.T) { - mockClock := clock.NewMock() - var checkCount atomic.Int32 - - checkFunc := func() bool { - checkCount.Add(1) - time.Sleep(50 * time.Millisecond) // Simulate slow check - return true - } - - checker, err := New(checkFunc, - WithClock(mockClock), - WithOnlineCheckInterval(1*time.Minute), +func TestSetCallbacks(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + // Callbacks MUST be set before calling Start() + oldOnlineCount, oldOfflineCount, newOnlineCount, newOfflineCount := atomic.Int32{}, atomic.Int32{}, atomic.Int32{}, atomic.Int32{} + onlineChan, offlineChan := make(chan struct{}), make(chan struct{}) + oldOnOnline := func() { oldOnlineCount.Add(1); close(onlineChan) } + oldOnOffline := func() { oldOfflineCount.Add(1); close(offlineChan) } + newOnOnline := func() { newOnlineCount.Add(1); close(onlineChan) } + newOnOffline := func() { newOfflineCount.Add(1); close(offlineChan) } + + checkInterval := time.Second + online := atomic.Bool{} + online.Store(true) + checkFunc := func() bool { return online.Load() } + + connChecker, err := New(checkFunc, + WithOnOnline(oldOnOnline), + WithOnOffline(oldOnOffline), + WithOfflineDelay(0), + WithOnlineCheckInterval(checkInterval), ) require.NoError(t, err) - defer checker.Close() - - // Wait for initialization to complete - time.Sleep(100 * time.Millisecond) // Longer wait to ensure initialization goroutine completes - initialCount := checkCount.Load() - - // Advance clock - mockClock.Add(2 * time.Minute) - - // Launch multiple concurrent checks - var wg sync.WaitGroup - for range 10 { - wg.Add(1) - go func() { - defer wg.Done() - checker.TriggerCheck() - }() - } - wg.Wait() + defer connChecker.Close() - // Wait for any ongoing checks to complete - time.Sleep(100 * time.Millisecond) + connChecker.SetCallbacks(newOnOnline, newOnOffline) - finalCount := checkCount.Load() + connChecker.Start() - // Only one additional check should have been performed (due to mutex) - // The mutex should prevent concurrent checks, so we should see exactly initialCount+1 or maybe +2 due to timing - assert.GreaterOrEqual(t, finalCount, initialCount+1) - assert.LessOrEqual(t, finalCount, initialCount+2) // Allow for slight timing variations - }) + <-onlineChan // wait for newOnOnline to be called + require.True(t, connChecker.IsOnline()) + require.Equal(t, int32(0), oldOnlineCount.Load()) + require.Equal(t, int32(1), newOnlineCount.Load()) - t.Run("probe loop with periodic checks", func(t *testing.T) { - mockClock := clock.NewMock() - var isOnline atomic.Bool - var checkCount atomic.Int32 - isOnline.Store(false) - - checkFunc := func() bool { - checkCount.Add(1) - return isOnline.Load() - } - - var onlineCallCount atomic.Int32 - onlineNotify := func() { onlineCallCount.Add(1) } - - checker, err := New(checkFunc, - WithClock(mockClock), - WithOnOnline(onlineNotify), - ) - require.NoError(t, err) - checker.Start() - defer checker.Close() - - // Wait for initialization - time.Sleep(10 * time.Millisecond) - initialCheckCount := checkCount.Load() - - // Advance clock multiple times to trigger periodic checks - for range 3 { - mockClock.Add(31 * time.Second) - time.Sleep(10 * time.Millisecond) - } + // Wait until we can perform a new check + time.Sleep(checkInterval) - // Multiple checks should have been performed - assert.Greater(t, checkCount.Load(), initialCheckCount) + // Go offline + online.Store(false) + connChecker.TriggerCheck() + require.False(t, connChecker.mutex.TryLock()) // node probing until it comes online - // Now make it go online - isOnline.Store(true) - mockClock.Add(31 * time.Second) - time.Sleep(50 * time.Millisecond) - - // Should be online and callback called - assert.True(t, checker.IsOnline()) - assert.Greater(t, onlineCallCount.Load(), int32(0)) + <-offlineChan // wait for newOnOffline to be called + require.False(t, connChecker.IsOnline()) + require.Equal(t, int32(0), oldOfflineCount.Load()) + require.Equal(t, int32(1), newOfflineCount.Load()) }) } -func TestConnectivityChecker_StateTransitions(t *testing.T) { - t.Parallel() - - t.Run("all state values", func(t *testing.T) { - var isOnline atomic.Bool - isOnline.Store(true) - - checkFunc := func() bool { - return isOnline.Load() - } - - checker, err := New(checkFunc, - WithOnlineCheckInterval(10*time.Millisecond), // Very short interval to allow quick trigger - WithOfflineDelay(100*time.Millisecond), // Short delay for testing - ) +func TestExponentialBackoff(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + checkCount := atomic.Int32{} + checkFunc := func() bool { checkCount.Add(1); return false } + connChecker, err := New(checkFunc) require.NoError(t, err) - checker.Start() - defer checker.Close() - - // Wait for initialization - should be Online - time.Sleep(30 * time.Millisecond) - assert.True(t, checker.IsOnline()) - - // Make offline and trigger check - should be Disconnected - isOnline.Store(false) + defer connChecker.Close() + + connChecker.Start() + require.False(t, connChecker.mutex.TryLock()) // node probing until it comes online + require.False(t, connChecker.IsOnline()) + + // Exponential backoff increase + expectedWait := initialBackoffDelay + expectedChecks := int32(1) // initial check + for expectedWait < maxBackoffDelay { + synctest.Wait() + require.Equal(t, expectedChecks, checkCount.Load()) + time.Sleep(expectedWait) + expectedChecks++ + expectedWait *= 2 + } - // Wait for online check interval to pass before triggering - time.Sleep(20 * time.Millisecond) - checker.TriggerCheck() - time.Sleep(30 * time.Millisecond) + // Reached max backoff delay + synctest.Wait() + require.Equal(t, expectedChecks, checkCount.Load()) - assert.False(t, checker.IsOnline()) + time.Sleep(maxBackoffDelay) + expectedChecks++ + synctest.Wait() + require.Equal(t, expectedChecks, checkCount.Load()) - // Wait beyond offline delay - should be Offline - time.Sleep(150 * time.Millisecond) - assert.False(t, checker.IsOnline()) + time.Sleep(3 * maxBackoffDelay) + expectedChecks += 3 + synctest.Wait() + require.Equal(t, expectedChecks, checkCount.Load()) }) } -func TestConnectivityChecker_Callbacks(t *testing.T) { - t.Parallel() - - t.Run("callbacks are called appropriately", func(t *testing.T) { - var isOnline atomic.Bool - isOnline.Store(true) // Start online first - - checkFunc := func() bool { - return isOnline.Load() - } - - var onlineCallCount, offlineCallCount atomic.Int32 - onlineNotify := func() { onlineCallCount.Add(1) } - offlineNotify := func() { offlineCallCount.Add(1) } - - checker, err := New(checkFunc, - WithOnlineCheckInterval(10*time.Millisecond), // Very short interval - WithOfflineDelay(100*time.Millisecond), // Short delay for testing - WithOnOnline(onlineNotify), - WithOnOffline(offlineNotify), - ) - require.NoError(t, err) - checker.Start() - defer checker.Close() - - // Wait for initialization - should be online - time.Sleep(30 * time.Millisecond) - assert.True(t, checker.IsOnline()) - - // Go offline and trigger a check to start the offline transition - isOnline.Store(false) - - // Wait for online check interval to pass before triggering - time.Sleep(20 * time.Millisecond) - checker.TriggerCheck() - - // Wait for offline delay to pass and offline callback to be called - time.Sleep(150 * time.Millisecond) - - // Should have called offline callback - assert.Greater(t, offlineCallCount.Load(), int32(0)) - - // Go online - isOnline.Store(true) - time.Sleep(50 * time.Millisecond) - - // Should have called online callback - assert.Greater(t, onlineCallCount.Load(), int32(0)) +func TestInvalidOptions(t *testing.T) { + t.Run("negative online check interval", func(t *testing.T) { + _, err := New(onlineCheckFunc, WithOnlineCheckInterval(-1)) + require.Error(t, err) }) - t.Run("panic in callback doesn't break checker", func(t *testing.T) { - var isOnline atomic.Bool - isOnline.Store(false) - - checkFunc := func() bool { - return isOnline.Load() - } - - safeCallback := func() { - defer func() { - if r := recover(); r != nil { - // Expected panic, recover and continue - } - }() - panic("test panic") - } - - // This should not panic during construction or operation - checker, err := New(checkFunc, - WithOnOnline(safeCallback), - WithOnOffline(safeCallback), - WithOfflineDelay(50*time.Millisecond), - ) - require.NoError(t, err) - defer checker.Close() - - // Wait for potential panics during initialization - time.Sleep(100 * time.Millisecond) - - // Go online - should not panic the test - isOnline.Store(true) - time.Sleep(100 * time.Millisecond) - - // Checker should still be functional - assert.NotPanics(t, func() { - checker.IsOnline() - }) + t.Run("negative offline delay", func(t *testing.T) { + _, err := New(onlineCheckFunc, WithOfflineDelay(-1*time.Hour)) + require.Error(t, err) }) } -func TestConnectivityChecker_EdgeCases(t *testing.T) { - t.Parallel() - - t.Run("close during probe loop", func(t *testing.T) { - var checkCount atomic.Int32 - checkFunc := func() bool { - checkCount.Add(1) - return false - } - - checker, err := New(checkFunc) +func TestClose(t *testing.T) { + t.Run("close while offline", func(t *testing.T) { + connChecker, err := New(offlineCheckFunc) require.NoError(t, err) - checker.Start() - - // Let it run for a bit - time.Sleep(50 * time.Millisecond) - initialCount := checkCount.Load() - - // Close and verify no more checks happen - checker.Close() - time.Sleep(50 * time.Millisecond) + defer connChecker.Close() - // Should have stopped checking - finalCount := checkCount.Load() - time.Sleep(50 * time.Millisecond) - laterCount := checkCount.Load() + connChecker.Start() + require.False(t, connChecker.mutex.TryLock()) // node probing until it comes online + require.False(t, connChecker.IsOnline()) - assert.Greater(t, initialCount, int32(0)) - assert.Equal(t, finalCount, laterCount) // No new checks after close - }) - - t.Run("trigger check after close", func(t *testing.T) { - var checkCount atomic.Int32 - checkFunc := func() bool { - checkCount.Add(1) - return true - } - - checker, err := New(checkFunc) + err = connChecker.Close() require.NoError(t, err) - time.Sleep(10 * time.Millisecond) - checker.Close() - - initialCount := checkCount.Load() - - // Should be ignored - checker.TriggerCheck() - time.Sleep(10 * time.Millisecond) - - assert.Equal(t, initialCount, checkCount.Load()) + require.True(t, connChecker.mutex.TryLock()) + connChecker.mutex.Unlock() }) - t.Run("high frequency operations", func(t *testing.T) { - mockClock := clock.NewMock() - var isOnline atomic.Bool - isOnline.Store(true) - - checkFunc := func() bool { - return isOnline.Load() - } - - checker, err := New(checkFunc, - WithClock(mockClock), - WithOnlineCheckInterval(100*time.Millisecond), + t.Run("close while online", func(t *testing.T) { + onlineChan := make(chan struct{}) + onOnline := func() { close(onlineChan) } + connChecker, err := New(onlineCheckFunc, + WithOnOnline(onOnline), ) require.NoError(t, err) - checker.Start() - defer checker.Close() - - // Rapid state checks should work - for range 100 { - go func() { - checker.IsOnline() - }() - } - - // Rapid trigger checks with clock advancement - for range 10 { - mockClock.Add(200 * time.Millisecond) - checker.TriggerCheck() - } + defer connChecker.Close() - time.Sleep(100 * time.Millisecond) + connChecker.Start() + <-onlineChan + require.True(t, connChecker.IsOnline()) - // Should still be functional - assert.True(t, checker.IsOnline()) + connChecker.Close() }) -} - -func TestConnectivityChecker_Options(t *testing.T) { - t.Parallel() - t.Run("WithOfflineDelay", func(t *testing.T) { - // Use real time for this test since mock clock with timers is complex - var isOnline atomic.Bool - isOnline.Store(true) + t.Run("SetCallbacks after Close", func(t *testing.T) { + onlineChan, offlineChan := make(chan struct{}), make(chan struct{}) + onOnline := func() { close(onlineChan) } + onOffline := func() { close(offlineChan) } - checkFunc := func() bool { - return isOnline.Load() - } - - var offlineCallCount atomic.Int32 - offlineNotify := func() { offlineCallCount.Add(1) } - - checker, err := New(checkFunc, - WithOnlineCheckInterval(10*time.Millisecond), // Very short interval - WithOfflineDelay(100*time.Millisecond), // Short delay for testing - WithOnOffline(offlineNotify), - ) + connChecker, err := New(offlineCheckFunc) require.NoError(t, err) - checker.Start() - defer checker.Close() + defer connChecker.Close() - // Wait for initialization - should be online - time.Sleep(30 * time.Millisecond) - assert.True(t, checker.IsOnline()) + require.Nil(t, connChecker.onOffline) + require.Nil(t, connChecker.onOnline) - // Go offline and trigger check - isOnline.Store(false) + connChecker.Close() + connChecker.SetCallbacks(onOnline, onOffline) - // Wait for online check interval to pass before triggering - time.Sleep(20 * time.Millisecond) - checker.TriggerCheck() - time.Sleep(30 * time.Millisecond) - - // Should be disconnected, not offline yet - require.False(t, checker.IsOnline()) - assert.Equal(t, int32(0), offlineCallCount.Load()) - - // Wait for offline delay to pass - time.Sleep(150 * time.Millisecond) - - // Now offline - require.False(t, checker.IsOnline()) - assert.Greater(t, offlineCallCount.Load(), int32(0)) + // Assert that callbacks were NOT set + require.Nil(t, connChecker.onOffline) + require.Nil(t, connChecker.onOnline) }) - t.Run("zero offline delay", func(t *testing.T) { - var isOnline atomic.Bool - isOnline.Store(true) - - checkFunc := func() bool { - return isOnline.Load() - } - - var offlineCallCount atomic.Int32 - offlineNotify := func() { offlineCallCount.Add(1) } - - checker, err := New(checkFunc, - WithOnlineCheckInterval(10*time.Millisecond), // Very short interval - WithOfflineDelay(0), - WithOnOffline(offlineNotify), - ) + t.Run("TriggerCheck after Close", func(t *testing.T) { + connChecker, err := New(offlineCheckFunc) require.NoError(t, err) - checker.Start() - defer checker.Close() - - // Wait for initialization to complete - should be online - time.Sleep(20 * time.Millisecond) - assert.True(t, checker.IsOnline()) + defer connChecker.Close() - // Go offline and trigger check - isOnline.Store(false) + connChecker.Start() + require.False(t, connChecker.mutex.TryLock()) // node probing until it comes online + require.False(t, connChecker.IsOnline()) - // Wait for online check interval to pass before triggering - time.Sleep(20 * time.Millisecond) - checker.TriggerCheck() - - // Give time for state to change to Disconnected first - time.Sleep(20 * time.Millisecond) - - // With zero delay, should quickly transition to Offline - time.Sleep(50 * time.Millisecond) - - assert.False(t, checker.IsOnline()) - assert.Greater(t, offlineCallCount.Load(), int32(0)) - }) -} + err = connChecker.Close() + require.NoError(t, err) -// Benchmark tests to ensure performance -func BenchmarkConnectivityChecker_StateAccess(b *testing.B) { - checkFunc := func() bool { return true } + require.True(t, connChecker.mutex.TryLock()) + connChecker.mutex.Unlock() - checker, err := New(checkFunc) - require.NoError(b, err) - defer checker.Close() + connChecker.TriggerCheck() // noop since closed - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - checker.IsOnline() - } + require.True(t, connChecker.mutex.TryLock()) + connChecker.mutex.Unlock() + require.False(t, connChecker.IsOnline()) }) } - -func BenchmarkConnectivityChecker_TriggerCheck(b *testing.B) { - mockClock := clock.NewMock() - var checkCount atomic.Int32 - - checkFunc := func() bool { - checkCount.Add(1) - return true - } - - checker, err := New(checkFunc, - WithClock(mockClock), - WithOnlineCheckInterval(1*time.Nanosecond), // Allow rapid checks - ) - require.NoError(b, err) - defer checker.Close() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - mockClock.Add(1 * time.Nanosecond) - checker.TriggerCheck() - } -} diff --git a/provider/internal/connectivity/options.go b/provider/internal/connectivity/options.go index a2e5aa5ee..a67fe2a6a 100644 --- a/provider/internal/connectivity/options.go +++ b/provider/internal/connectivity/options.go @@ -3,12 +3,9 @@ package connectivity import ( "fmt" "time" - - "github.com/filecoin-project/go-clock" ) type config struct { - clock clock.Clock onlineCheckInterval time.Duration // minimum check interval when online offlineDelay time.Duration @@ -29,19 +26,11 @@ func (cfg *config) apply(opts ...Option) error { type Option func(opt *config) error var DefaultConfig = func(cfg *config) error { - cfg.clock = clock.New() cfg.onlineCheckInterval = 1 * time.Minute + cfg.offlineDelay = 2 * time.Hour return nil } -// WithClock sets the clock used by the connectivity checker. -func WithClock(c clock.Clock) Option { - return func(cfg *config) error { - cfg.clock = c - return nil - } -} - // WithOnlineCheckInterval sets the minimum interval between online checks. // This is used to throttle the number of connectivity checks when the node is // online. diff --git a/provider/options.go b/provider/options.go index a36d63485..891fbf1a1 100644 --- a/provider/options.go +++ b/provider/options.go @@ -5,7 +5,6 @@ import ( "fmt" "time" - "github.com/filecoin-project/go-clock" "github.com/libp2p/go-libp2p-kad-dht/amino" pb "github.com/libp2p/go-libp2p-kad-dht/pb" "github.com/libp2p/go-libp2p-kad-dht/provider/datastore" @@ -48,8 +47,6 @@ type config struct { selfAddrs func() []ma.Multiaddr addLocalRecord func(mh.Multihash) error - clock clock.Clock - maxWorkers int dedicatedPeriodicWorkers int dedicatedBurstWorkers int @@ -93,8 +90,6 @@ var DefaultConfig = func(cfg *config) error { cfg.offlineDelay = DefaultOfflineDelay cfg.connectivityCheckOnlineInterval = DefaultConnectivityCheckOnlineInterval - cfg.clock = clock.New() - cfg.maxWorkers = 4 cfg.dedicatedPeriodicWorkers = 2 cfg.dedicatedBurstWorkers = 1 @@ -216,15 +211,6 @@ func WithAddLocalRecord(f func(mh.Multihash) error) Option { } } -// WithClock sets the clock used by the provider. This is useful for testing -// purposes, allowing to control time in tests. -func WithClock(c clock.Clock) Option { - return func(cfg *config) error { - cfg.clock = c - return nil - } -} - // WithMaxWorkers sets the maximum number of workers that can be used for // provide and reprovide jobs. The job of a worker is to explore a region of // the keyspace and (re)provide the keys matching the region to the closest diff --git a/provider/provider.go b/provider/provider.go index f4092d39e..7c03ebe65 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -9,7 +9,6 @@ import ( "sync/atomic" "time" - "github.com/filecoin-project/go-clock" pool "github.com/guillaumemichel/reservedpool" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" @@ -130,7 +129,6 @@ type SweepingProvider struct { workerPool *pool.Pool[workerType] maxProvideConnsPerWorker int - clock clock.Clock cycleStart time.Time reprovideInterval time.Duration maxReprovideDelay time.Duration @@ -138,7 +136,7 @@ type SweepingProvider struct { schedule *trie.Trie[bitstr.Key, time.Duration] scheduleLk sync.Mutex scheduleCursor bitstr.Key - scheduleTimer *clock.Timer + scheduleTimer *time.Timer scheduleTimerStartedAt time.Time ongoingReprovides *trie.Trie[bitstr.Key, struct{}] @@ -196,7 +194,6 @@ func New(opts ...Option) (*SweepingProvider, error) { peers, err := cfg.router.GetClosestPeers(ctx, string(cfg.peerid)) return err == nil && len(peers) > 0 }, - connectivity.WithClock(cfg.clock), connectivity.WithOfflineDelay(cfg.offlineDelay), connectivity.WithOnlineCheckInterval(cfg.connectivityCheckOnlineInterval), ) @@ -232,8 +229,7 @@ func New(opts ...Option) (*SweepingProvider, error) { avgPrefixLenValidity: defaultPrefixLenValidity, cachedAvgPrefixLen: -1, - clock: cfg.clock, - cycleStart: cfg.clock.Now(), + cycleStart: time.Now(), msgSender: cfg.msgSender, getSelfAddrs: cfg.selfAddrs, @@ -242,7 +238,7 @@ func New(opts ...Option) (*SweepingProvider, error) { keyStore: cfg.keyStore, schedule: trie.New[bitstr.Key, time.Duration](), - scheduleTimer: cfg.clock.Timer(time.Hour), + scheduleTimer: time.NewTimer(time.Hour), provideQueue: queue.NewProvideQueue(), reprovideQueue: queue.NewReprovideQueue(), @@ -331,7 +327,7 @@ func (s *SweepingProvider) closed() bool { func (s *SweepingProvider) scheduleNextReprovideNoLock(prefix bitstr.Key, timeUntilReprovide time.Duration) { s.scheduleCursor = prefix s.scheduleTimer.Reset(timeUntilReprovide) - s.scheduleTimerStartedAt = s.clock.Now() + s.scheduleTimerStartedAt = time.Now() } func (s *SweepingProvider) reschedulePrefix(prefix bitstr.Key) { @@ -408,7 +404,7 @@ func (s *SweepingProvider) unscheduleSubsumedPrefixesNoLock(prefix bitstr.Key) { // currentTimeOffset returns the current time offset in the reprovide cycle. func (s *SweepingProvider) currentTimeOffset() time.Duration { - return s.timeOffset(s.clock.Now()) + return s.timeOffset(time.Now()) } // timeOffset returns the time offset in the reprovide cycle for the given @@ -503,7 +499,7 @@ func (s *SweepingProvider) approxPrefixLen() { } s.connectivity.TriggerCheck() - s.clock.Sleep(time.Second) // retry every second until success + time.Sleep(time.Second) // retry every second until success } }() } @@ -519,7 +515,7 @@ func (s *SweepingProvider) approxPrefixLen() { s.cachedAvgPrefixLen = int(cplSum.Load() / nSamples) } logger.Debugf("prefix len approximation is %d", s.cachedAvgPrefixLen) - s.lastAvgPrefixLen = s.clock.Now() + s.lastAvgPrefixLen = time.Now() } // getAvgPrefixLenNoLock returns the average prefix length of all scheduled @@ -535,7 +531,7 @@ func (s *SweepingProvider) getAvgPrefixLenNoLock() (int, error) { return -1, ErrOffline } - if s.lastAvgPrefixLen.Add(s.avgPrefixLenValidity).After(s.clock.Now()) { + if s.lastAvgPrefixLen.Add(s.avgPrefixLenValidity).After(time.Now()) { // Return cached value if it is still valid. return s.cachedAvgPrefixLen, nil } @@ -547,7 +543,7 @@ func (s *SweepingProvider) getAvgPrefixLenNoLock() (int, error) { prefixLenSum += len(entry.Key) } s.cachedAvgPrefixLen = prefixLenSum / len(scheduleEntries) - s.lastAvgPrefixLen = s.clock.Now() + s.lastAvgPrefixLen = time.Now() } return s.cachedAvgPrefixLen, nil } @@ -709,7 +705,7 @@ func (s *SweepingProvider) sendProviderRecords(keysAllocations map[peer.ID][]mh. if nPeers == 0 { return nil } - startTime := s.clock.Now() + startTime := time.Now() errCount := atomic.Uint32{} nWorkers := s.maxProvideConnsPerWorker jobChan := make(chan provideJob, nWorkers) @@ -736,7 +732,7 @@ func (s *SweepingProvider) sendProviderRecords(keysAllocations map[peer.ID][]mh. wg.Wait() errCountLoaded := int(errCount.Load()) - logger.Infof("sent provider records to peers in %s, errors %d/%d", s.clock.Since(startTime), errCountLoaded, len(keysAllocations)) + logger.Infof("sent provider records to peers in %s, errors %d/%d", time.Since(startTime), errCountLoaded, len(keysAllocations)) if errCountLoaded == nPeers || errCountLoaded > int(float32(nPeers)*(1-minimalRegionReachablePeersRatio)) { return fmt.Errorf("unable to provide to enough peers (%d/%d)", nPeers-errCountLoaded, nPeers) @@ -814,7 +810,7 @@ func (s *SweepingProvider) handleReprovide() { timeSinceTimerRunning := s.timeBetween(s.timeOffset(s.scheduleTimerStartedAt), currentTimeOffset) timeSinceTimerUntilNext := s.timeBetween(s.timeOffset(s.scheduleTimerStartedAt), next.Data) - if s.scheduleTimerStartedAt.Add(s.reprovideInterval).Before(s.clock.Now()) { + if s.scheduleTimerStartedAt.Add(s.reprovideInterval).Before(time.Now()) { // Alarm was programmed more than reprovideInterval ago, which means that // no regions has been reprovided since. Add all regions to the reprovide // queue. This only happens if the main thread gets blocked for more than diff --git a/provider/provider_test.go b/provider/provider_test.go index 36455edb0..32f3bf9d4 100644 --- a/provider/provider_test.go +++ b/provider/provider_test.go @@ -1,3 +1,6 @@ +//go:build go1.25 +// +build go1.25 + package provider import ( @@ -5,14 +8,15 @@ import ( "context" "crypto/sha256" "errors" + "slices" "strconv" "strings" "sync" "sync/atomic" "testing" + "testing/synctest" "time" - "github.com/filecoin-project/go-clock" "github.com/guillaumemichel/reservedpool" ds "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" @@ -215,64 +219,64 @@ func TestReprovideTimeForPrefixWithCustomOrder(t *testing.T) { } func TestClosestPeersToPrefixRandom(t *testing.T) { - replicationFactor := 10 - nPeers := 128 - peers := random.Peers(nPeers) - peersTrie := trie.New[bit256.Key, peer.ID]() - for _, p := range peers { - peersTrie.Add(keyspace.PeerIDToBit256(p), p) - } + synctest.Test(t, func(t *testing.T) { + replicationFactor := 10 + nPeers := 128 + peers := random.Peers(nPeers) + peersTrie := trie.New[bit256.Key, peer.ID]() + for _, p := range peers { + peersTrie.Add(keyspace.PeerIDToBit256(p), p) + } - router := &mockRouter{ - getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { - sortedPeers := kb.SortClosestPeers(peers, kb.ConvertKey(k)) - return sortedPeers[:min(replicationFactor, len(peers))], nil - }, - } + router := &mockRouter{ + getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { + sortedPeers := kb.SortClosestPeers(peers, kb.ConvertKey(k)) + return sortedPeers[:min(replicationFactor, len(peers))], nil + }, + } - r := SweepingProvider{ - router: router, - replicationFactor: replicationFactor, - connectivity: noopConnectivityChecker(), - } - r.connectivity.Start() - defer r.connectivity.Close() - time.Sleep(5 * time.Millisecond) // give some time for connectivity to be set - - waitUntil(t, func() bool { return r.connectivity.IsOnline() }, time.Second, "connectivity should be online") - - for _, prefix := range []bitstr.Key{"", "0", "1", "00", "01", "10", "11", "000", "001", "010", "011", "100", "101", "110", "111"} { - closestPeers, err := r.closestPeersToPrefix(prefix) - require.NoError(t, err, "failed for prefix %s", prefix) - subtrieSize := 0 - currPrefix := prefix - // Reduce prefix if necessary as closestPeersToPrefix always returns at - // least replicationFactor peers if possible. - for { - subtrie, ok := keyspace.FindSubtrie(peersTrie, currPrefix) - require.True(t, ok) - subtrieSize = subtrie.Size() - if subtrieSize >= replicationFactor { - break + r := SweepingProvider{ + router: router, + replicationFactor: replicationFactor, + connectivity: noopConnectivityChecker(), + } + r.connectivity.Start() + defer r.connectivity.Close() + + synctest.Wait() + require.True(t, r.connectivity.IsOnline()) + + for _, prefix := range []bitstr.Key{"", "0", "1", "00", "01", "10", "11", "000", "001", "010", "011", "100", "101", "110", "111"} { + closestPeers, err := r.closestPeersToPrefix(prefix) + require.NoError(t, err, "failed for prefix %s", prefix) + subtrieSize := 0 + currPrefix := prefix + // Reduce prefix if necessary as closestPeersToPrefix always returns at + // least replicationFactor peers if possible. + for { + subtrie, ok := keyspace.FindSubtrie(peersTrie, currPrefix) + require.True(t, ok) + subtrieSize = subtrie.Size() + if subtrieSize >= replicationFactor { + break + } + currPrefix = currPrefix[:len(currPrefix)-1] } - currPrefix = currPrefix[:len(currPrefix)-1] + require.Len(t, closestPeers, subtrieSize, "prefix: %s", prefix) } - require.Len(t, closestPeers, subtrieSize, "prefix: %s", prefix) - } + }) } func TestGroupAndScheduleKeysByPrefix(t *testing.T) { - mockClock := clock.NewMock() prov := SweepingProvider{ order: bit256.ZeroKey(), - clock: mockClock, reprovideInterval: time.Hour, schedule: trie.New[bitstr.Key, time.Duration](), - scheduleTimer: mockClock.Timer(time.Hour), + scheduleTimer: time.NewTimer(time.Hour), cachedAvgPrefixLen: 3, - lastAvgPrefixLen: mockClock.Now(), + lastAvgPrefixLen: time.Now(), } mhs00000 := genMultihashesMatchingPrefix("00000", 3) @@ -392,18 +396,16 @@ func TestIndividualProvideSingle(t *testing.T) { return nil }, } - mockClock := clock.NewMock() r := SweepingProvider{ router: router, msgSender: msgSender, - clock: mockClock, reprovideInterval: time.Hour, maxProvideConnsPerWorker: 2, provideQueue: queue.NewProvideQueue(), reprovideQueue: queue.NewReprovideQueue(), connectivity: noopConnectivityChecker(), schedule: trie.New[bitstr.Key, time.Duration](), - scheduleTimer: mockClock.Timer(time.Hour), + scheduleTimer: time.NewTimer(time.Hour), getSelfAddrs: func() []ma.Multiaddr { return nil }, addLocalRecord: func(mh mh.Multihash) error { return nil }, provideCounter: provideCounter(), @@ -479,18 +481,16 @@ func TestIndividualProvideMultiple(t *testing.T) { return nil }, } - mockClock := clock.NewMock() r := SweepingProvider{ router: router, msgSender: msgSender, - clock: mockClock, reprovideInterval: time.Hour, maxProvideConnsPerWorker: 2, provideQueue: queue.NewProvideQueue(), reprovideQueue: queue.NewReprovideQueue(), connectivity: noopConnectivityChecker(), schedule: trie.New[bitstr.Key, time.Duration](), - scheduleTimer: mockClock.Timer(time.Hour), + scheduleTimer: time.NewTimer(time.Hour), getSelfAddrs: func() []ma.Multiaddr { return nil }, addLocalRecord: func(mh mh.Multihash) error { return nil }, provideCounter: provideCounter(), @@ -582,639 +582,637 @@ func TestIndividualProvideMultiple(t *testing.T) { require.True(t, r.provideQueue.IsEmpty()) } -func waitUntil(t *testing.T, condition func() bool, maxDelay time.Duration, args ...any) { - step := time.Millisecond - for range maxDelay / step { - if condition() { - return - } - time.Sleep(step) - } - t.Fatal(args...) -} - func TestHandleReprovide(t *testing.T) { - mockClock := clock.NewMock() - - online := atomic.Bool{} - online.Store(true) - connectivityCheckInterval := time.Second - offlineDelay := time.Minute - connChecker, err := connectivity.New( - func() bool { return online.Load() }, - connectivity.WithClock(mockClock), - connectivity.WithOfflineDelay(offlineDelay), - connectivity.WithOnlineCheckInterval(connectivityCheckInterval), - ) - require.NoError(t, err) - defer connChecker.Close() + synctest.Test(t, func(t *testing.T) { + online := atomic.Bool{} + online.Store(true) + connectivityCheckInterval := time.Second + offlineDelay := time.Minute + connChecker, err := connectivity.New( + func() bool { return online.Load() }, + connectivity.WithOfflineDelay(offlineDelay), + connectivity.WithOnlineCheckInterval(connectivityCheckInterval), + ) + require.NoError(t, err) + defer connChecker.Close() - prov := SweepingProvider{ - order: bit256.ZeroKey(), + prov := SweepingProvider{ + order: bit256.ZeroKey(), - connectivity: connChecker, + connectivity: connChecker, - clock: mockClock, - cycleStart: mockClock.Now(), - scheduleTimer: mockClock.Timer(time.Hour), - schedule: trie.New[bitstr.Key, time.Duration](), + cycleStart: time.Now(), + scheduleTimer: time.NewTimer(time.Hour), + schedule: trie.New[bitstr.Key, time.Duration](), - reprovideQueue: queue.NewReprovideQueue(), - workerPool: reservedpool.New[workerType](1, nil), // single worker + reprovideQueue: queue.NewReprovideQueue(), + workerPool: reservedpool.New[workerType](1, nil), // single worker - reprovideInterval: time.Minute, - maxReprovideDelay: 5 * time.Second, + reprovideInterval: time.Minute, + maxReprovideDelay: 5 * time.Second, - getSelfAddrs: func() []ma.Multiaddr { return nil }, - } - prov.scheduleTimer.Stop() - connChecker.Start() - defer connChecker.Close() - - prefixes := []bitstr.Key{ - "00", - "10", - "11", - } + getSelfAddrs: func() []ma.Multiaddr { return nil }, + } + prov.scheduleTimer.Stop() + connChecker.Start() + defer connChecker.Close() + + prefixes := []bitstr.Key{ + "00", + "10", + "11", + } - // Empty schedule -> early return - prov.handleReprovide() - require.Zero(t, prov.scheduleCursor) - - // Single prefix in schedule - prov.schedule.Add(prefixes[0], prov.reprovideTimeForPrefix(prefixes[0])) - prov.scheduleCursor = prefixes[0] - prov.handleReprovide() - require.Equal(t, prefixes[0], prov.scheduleCursor) - - // Two prefixes in schedule - mockClock.Add(1) - prov.schedule.Add(prefixes[1], prov.reprovideTimeForPrefix(prefixes[1])) - prov.handleReprovide() // reprovides prefixes[0], set scheduleCursor to prefixes[1] - require.Equal(t, prefixes[1], prov.scheduleCursor) - - // Wait more than reprovideInterval to call handleReprovide again. - // All prefixes should be added to the reprovide queue. - mockClock.Add(prov.reprovideInterval + 1) - require.True(t, prov.reprovideQueue.IsEmpty()) - prov.handleReprovide() - require.Equal(t, prefixes[1], prov.scheduleCursor) - - require.Equal(t, 2, prov.reprovideQueue.Size()) - dequeued, ok := prov.reprovideQueue.Dequeue() - require.True(t, ok) - require.Equal(t, prefixes[0], dequeued) - dequeued, ok = prov.reprovideQueue.Dequeue() - require.True(t, ok) - require.Equal(t, prefixes[1], dequeued) - require.True(t, prov.reprovideQueue.IsEmpty()) - - // Go in time past prefixes[1] and prefixes[2] - prov.schedule.Add(prefixes[2], prov.reprovideTimeForPrefix(prefixes[2])) - mockClock.Add(3 * prov.reprovideInterval / 4) - // reprovides prefixes[1], add prefixes[2] to reprovide queue, set - // scheduleCursor to prefixes[0] - prov.handleReprovide() - require.Equal(t, prefixes[0], prov.scheduleCursor) - - require.Equal(t, 1, prov.reprovideQueue.Size()) - dequeued, ok = prov.reprovideQueue.Dequeue() - require.True(t, ok) - require.Equal(t, prefixes[2], dequeued) - require.True(t, prov.reprovideQueue.IsEmpty()) - - mockClock.Add(prov.reprovideInterval / 4) - - // Node goes offline -> prefixes are queued - online.Store(false) - prov.connectivity.TriggerCheck() - waitUntil(t, func() bool { return !prov.connectivity.IsOnline() }, 100*time.Millisecond, "connectivity should be disconnected") - // require.True(t, prov.reprovideQueue.IsEmpty()) - prov.handleReprovide() - // require.Equal(t, 1, prov.reprovideQueue.Size()) - - // Node comes back online - online.Store(true) - mockClock.Add(connectivityCheckInterval) - waitUntil(t, func() bool { return prov.connectivity.IsOnline() }, 100*time.Millisecond, "connectivity should be online") + // Empty schedule -> early return + prov.handleReprovide() + require.Zero(t, prov.scheduleCursor) + + // Single prefix in schedule + prov.schedule.Add(prefixes[0], prov.reprovideTimeForPrefix(prefixes[0])) + prov.scheduleCursor = prefixes[0] + prov.handleReprovide() + require.Equal(t, prefixes[0], prov.scheduleCursor) + + // Two prefixes in schedule + time.Sleep(1) // advance 1 tick into the reprovide cycle + prov.schedule.Add(prefixes[1], prov.reprovideTimeForPrefix(prefixes[1])) + prov.handleReprovide() // reprovides prefixes[0], set scheduleCursor to prefixes[1] + require.Equal(t, prefixes[1], prov.scheduleCursor) + + // Wait more than reprovideInterval to call handleReprovide again. + // All prefixes should be added to the reprovide queue. + time.Sleep(prov.reprovideInterval + 1) + require.True(t, prov.reprovideQueue.IsEmpty()) + prov.handleReprovide() + require.Equal(t, prefixes[1], prov.scheduleCursor) + + require.Equal(t, 2, prov.reprovideQueue.Size()) + dequeued, ok := prov.reprovideQueue.Dequeue() + require.True(t, ok) + require.Equal(t, prefixes[0], dequeued) + dequeued, ok = prov.reprovideQueue.Dequeue() + require.True(t, ok) + require.Equal(t, prefixes[1], dequeued) + require.True(t, prov.reprovideQueue.IsEmpty()) + + // Go in time past prefixes[1] and prefixes[2] + prov.schedule.Add(prefixes[2], prov.reprovideTimeForPrefix(prefixes[2])) + time.Sleep(3 * prov.reprovideInterval / 4) + // reprovides prefixes[1], add prefixes[2] to reprovide queue, set + // scheduleCursor to prefixes[0] + prov.handleReprovide() + require.Equal(t, prefixes[0], prov.scheduleCursor) + + require.Equal(t, 1, prov.reprovideQueue.Size()) + dequeued, ok = prov.reprovideQueue.Dequeue() + require.True(t, ok) + require.Equal(t, prefixes[2], dequeued) + require.True(t, prov.reprovideQueue.IsEmpty()) + + time.Sleep(prov.reprovideInterval / 4) + + // Node goes offline -> prefixes are queued + online.Store(false) + prov.connectivity.TriggerCheck() + synctest.Wait() + require.False(t, prov.connectivity.IsOnline()) + require.True(t, prov.reprovideQueue.IsEmpty()) + prov.handleReprovide() + require.Equal(t, 1, prov.reprovideQueue.Size()) + + // Node comes back online + online.Store(true) + time.Sleep(connectivityCheckInterval) + synctest.Wait() + require.True(t, prov.connectivity.IsOnline()) + }) } func TestClose(t *testing.T) { - pid, err := peer.Decode("12BoooooPEER") - require.NoError(t, err) - router := &mockRouter{ - getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { - if ctx.Err() != nil { - return nil, ctx.Err() - } - return []peer.ID{peer.ID("12BoooooPEER1"), peer.ID("12BoooooPEER2")}, nil - }, - } - msgSender := &mockMsgSender{ - sendMessageFunc: func(ctx context.Context, p peer.ID, m *pb.Message) error { - if ctx.Err() != nil { - return ctx.Err() - } - return nil - }, - } - mockClock := clock.NewMock() - prov, err := New( - WithPeerID(pid), - WithClock(mockClock), - WithRouter(router), - WithMessageSender(msgSender), - WithReplicationFactor(1), - - WithMaxWorkers(4), - WithDedicatedBurstWorkers(0), - WithDedicatedPeriodicWorkers(0), - - WithSelfAddrs(func() []ma.Multiaddr { - addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") - require.NoError(t, err) - return []ma.Multiaddr{addr} - }), - ) - require.NoError(t, err) - - mhs := genMultihashes(128) - prov.StartProviding(false, mhs...) + synctest.Test(t, func(t *testing.T) { + pid, err := peer.Decode("12BoooooPEER") + require.NoError(t, err) + router := &mockRouter{ + getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } + return []peer.ID{peer.ID("12BoooooPEER1"), peer.ID("12BoooooPEER2")}, nil + }, + } + msgSender := &mockMsgSender{ + sendMessageFunc: func(ctx context.Context, p peer.ID, m *pb.Message) error { + if ctx.Err() != nil { + return ctx.Err() + } + return nil + }, + } + prov, err := New( + WithPeerID(pid), + WithRouter(router), + WithMessageSender(msgSender), + WithReplicationFactor(1), + + WithMaxWorkers(4), + WithDedicatedBurstWorkers(0), + WithDedicatedPeriodicWorkers(0), + + WithSelfAddrs(func() []ma.Multiaddr { + addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") + require.NoError(t, err) + return []ma.Multiaddr{addr} + }), + ) + require.NoError(t, err) - time.Sleep(20 * time.Millisecond) // leave time to perform provide - mockClock.Add(prov.reprovideInterval / 2) // some keys should have been reprovided - time.Sleep(20 * time.Millisecond) // leave time to perform reprovide + mhs := genMultihashes(128) + err = prov.StartProviding(false, mhs...) + require.NoError(t, err) + synctest.Wait() // wait for connectivity check + time.Sleep(prov.reprovideInterval / 2) // some keys should have been reprovided + synctest.Wait() - err = prov.Close() - require.NoError(t, err) + err = prov.Close() + require.NoError(t, err) - newMh := random.Multihashes(1)[0] + newMh := random.Multihashes(1)[0] - prov.StartProviding(false, newMh) - prov.StopProviding(newMh) - prov.ProvideOnce(newMh) - require.Equal(t, 0, prov.Clear()) + err = prov.StartProviding(false, newMh) + require.ErrorIs(t, err, ErrClosed) + err = prov.StopProviding(newMh) + require.ErrorIs(t, err, ErrClosed) + err = prov.ProvideOnce(newMh) + require.ErrorIs(t, err, ErrClosed) + require.Equal(t, 0, prov.Clear()) - _, err = prov.keyStore.Get(context.Background(), "") - require.ErrorIs(t, err, datastore.ErrKeyStoreClosed) + _, err = prov.keyStore.Get(context.Background(), "") + require.ErrorIs(t, err, datastore.ErrKeyStoreClosed) - err = prov.workerPool.Acquire(burstWorker) - require.ErrorIs(t, err, reservedpool.ErrClosed) + err = prov.workerPool.Acquire(burstWorker) + require.ErrorIs(t, err, reservedpool.ErrClosed) + }) } func TestProvideOnce(t *testing.T) { - pid, err := peer.Decode("12BoooooPEER") - require.NoError(t, err) - - online := atomic.Bool{} // false, start offline - router := &mockRouter{ - getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { - if online.Load() { - return []peer.ID{pid}, nil - } - return nil, errors.New("offline") - }, - } - provideCount := atomic.Int32{} - msgSender := &mockMsgSender{ - sendMessageFunc: func(ctx context.Context, p peer.ID, m *pb.Message) error { - provideCount.Add(1) - return nil - }, - } - clk := clock.NewMock() - - checkInterval := time.Second - offlineDelay := time.Minute - - opts := []Option{ - WithPeerID(pid), - WithRouter(router), - WithMessageSender(msgSender), - WithSelfAddrs(func() []ma.Multiaddr { - addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") - require.NoError(t, err) - return []ma.Multiaddr{addr} - }), - WithClock(clk), - WithOfflineDelay(offlineDelay), - WithConnectivityCheckOnlineInterval(checkInterval), - } - prov, err := New(opts...) - require.NoError(t, err) - defer prov.Close() - - h := genMultihashes(1)[0] - - // Node is offline, ProvideOne should error - err = prov.ProvideOnce(h) - require.ErrorIs(t, err, ErrOffline) - require.True(t, prov.provideQueue.IsEmpty()) - require.Equal(t, int32(0), provideCount.Load(), "should not have provided when offline 0") + synctest.Test(t, func(t *testing.T) { + pid, err := peer.Decode("12BoooooPEER") + require.NoError(t, err) - // Wait for provider to come online - online.Store(true) - clk.Add(checkInterval) // trigger connectivity check - waitUntil(t, prov.connectivity.IsOnline, 200*time.Millisecond, "connectivity should be online 0") + online := atomic.Bool{} // false, start offline + router := &mockRouter{ + getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { + if online.Load() { + return []peer.ID{pid}, nil + } + return nil, errors.New("offline") + }, + } + provideCount := atomic.Int32{} + msgSender := &mockMsgSender{ + sendMessageFunc: func(ctx context.Context, p peer.ID, m *pb.Message) error { + provideCount.Add(1) + return nil + }, + } - // Set the provider as disconnected - online.Store(false) - err = prov.ProvideOnce(h) - require.NoError(t, err) - time.Sleep(5 * time.Millisecond) // wait for ProvideOnce to finish - require.Equal(t, int32(0), provideCount.Load(), "should not have provided when offline 1") - // Ensure the key is in the provide queue - _, keys, ok := prov.provideQueue.Dequeue() - require.True(t, ok) - require.Equal(t, 1, len(keys)) - require.Equal(t, h, keys[0]) - - // Set the provider as online - online.Store(true) - clk.Add(checkInterval) // trigger connectivity check - waitUntil(t, prov.connectivity.IsOnline, 200*time.Millisecond, "connectivity should be online 1") - err = prov.ProvideOnce(h) - require.NoError(t, err) - waitUntil(t, func() bool { return provideCount.Load() == 1 }, 200*time.Millisecond, "waiting for ProvideOnce to finish") + checkInterval := time.Second + offlineDelay := time.Minute + + opts := []Option{ + WithPeerID(pid), + WithRouter(router), + WithMessageSender(msgSender), + WithSelfAddrs(func() []ma.Multiaddr { + addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") + require.NoError(t, err) + return []ma.Multiaddr{addr} + }), + WithOfflineDelay(offlineDelay), + WithConnectivityCheckOnlineInterval(checkInterval), + } + prov, err := New(opts...) + require.NoError(t, err) + defer prov.Close() + + h := genMultihashes(1)[0] + + // Node is offline, ProvideOne should error + err = prov.ProvideOnce(h) + require.ErrorIs(t, err, ErrOffline) + require.True(t, prov.provideQueue.IsEmpty()) + require.Equal(t, int32(0), provideCount.Load(), "should not have provided when offline 0") + + // Wait for provider to come online + online.Store(true) + time.Sleep(checkInterval) // trigger connectivity check + synctest.Wait() + require.True(t, prov.connectivity.IsOnline()) + + // Set the provider as disconnected + online.Store(false) + synctest.Wait() + err = prov.ProvideOnce(h) + require.NoError(t, err) + synctest.Wait() // wait for ProvideOnce to finish + require.Equal(t, int32(0), provideCount.Load(), "should not have provided when offline 1") + // Ensure the key is in the provide queue + _, keys, ok := prov.provideQueue.Dequeue() + require.True(t, ok) + require.Equal(t, 1, len(keys)) + require.Equal(t, h, keys[0]) + + // Set the provider as online + online.Store(true) + time.Sleep(checkInterval) // trigger connectivity check + synctest.Wait() + require.True(t, prov.connectivity.IsOnline()) + err = prov.ProvideOnce(h) + require.NoError(t, err) + synctest.Wait() + require.Equal(t, int32(1), provideCount.Load()) + }) } func TestStartProvidingSingle(t *testing.T) { - pid, err := peer.Decode("12BoooooPEER") - require.NoError(t, err) - replicationFactor := 4 - h := genMultihashes(1)[0] + synctest.Test(t, func(t *testing.T) { + pid, err := peer.Decode("12BoooooPEER") + require.NoError(t, err) + replicationFactor := 4 + h := genMultihashes(1)[0] - mockClock := clock.NewMock() - reprovideInterval := time.Hour + reprovideInterval := time.Hour - prefixLen := 4 - peers := make([]peer.ID, replicationFactor) - peers[0], err = peer.Decode("12BooooPEER1") - require.NoError(t, err) - kbKey := keyspace.KeyToBytes(keyspace.PeerIDToBit256(peers[0])) - for i := range peers[1:] { - peers[i+1], err = kb.GenRandPeerIDWithCPL(kbKey, uint(prefixLen)) + prefixLen := 4 + peers := make([]peer.ID, replicationFactor) + seen := make(map[peer.ID]struct{}, replicationFactor) + peers[0], err = peer.Decode("12BooooPEER1") require.NoError(t, err) - } - - getClosestPeersCount := atomic.Int32{} - router := &mockRouter{ - getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { - getClosestPeersCount.Add(1) - return peers, nil - }, - } - provideCount := atomic.Int32{} - msgSender := &mockMsgSender{ - sendMessageFunc: func(ctx context.Context, p peer.ID, m *pb.Message) error { - provideCount.Add(1) - return nil - }, - } - checkInterval := time.Second - offlineDelay := time.Minute - opts := []Option{ - WithReplicationFactor(replicationFactor), - WithReprovideInterval(reprovideInterval), - WithPeerID(pid), - WithRouter(router), - WithMessageSender(msgSender), - WithSelfAddrs(func() []ma.Multiaddr { - addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") + kbKey := keyspace.KeyToBytes(keyspace.PeerIDToBit256(peers[0])) + for i := range peers[1:] { + p, err := kb.GenRandPeerIDWithCPL(kbKey, uint(prefixLen)) require.NoError(t, err) - return []ma.Multiaddr{addr} - }), - WithClock(mockClock), - WithOfflineDelay(offlineDelay), - WithConnectivityCheckOnlineInterval(checkInterval), - } - prov, err := New(opts...) - require.NoError(t, err) - defer prov.Close() - time.Sleep(5 * time.Millisecond) // give some time for connectivity to be set + if _, ok := seen[p]; ok { + continue + } + seen[p] = struct{}{} + peers[i+1] = p + } + // Sort peers from closest to h, to furthest + slices.SortFunc(peers, func(a, b peer.ID) int { + targetKey := keyspace.MhToBit256(h) + return keyspace.PeerIDToBit256(a).Xor(targetKey).Compare(keyspace.PeerIDToBit256(b).Xor(targetKey)) + }) + + getClosestPeersCount := atomic.Int32{} + router := &mockRouter{ + getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { + getClosestPeersCount.Add(1) + return peers, nil + }, + } + provideCount := atomic.Int32{} + msgSender := &mockMsgSender{ + sendMessageFunc: func(ctx context.Context, p peer.ID, m *pb.Message) error { + provideCount.Add(1) + return nil + }, + } + checkInterval := time.Second + offlineDelay := time.Minute + opts := []Option{ + WithReplicationFactor(replicationFactor), + WithReprovideInterval(reprovideInterval), + WithPeerID(pid), + WithRouter(router), + WithMessageSender(msgSender), + WithSelfAddrs(func() []ma.Multiaddr { + addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") + require.NoError(t, err) + return []ma.Multiaddr{addr} + }), + WithOfflineDelay(offlineDelay), + WithConnectivityCheckOnlineInterval(checkInterval), + } + prov, err := New(opts...) + require.NoError(t, err) + defer prov.Close() - mockClock.Add(checkInterval) // trigger connectivity check - waitUntil(t, func() bool { return prov.connectivity.IsOnline() }, 100*time.Millisecond, "connectivity should be online") - waitUntil(t, func() bool { + synctest.Wait() + require.True(t, prov.connectivity.IsOnline()) prov.avgPrefixLenLk.Lock() - defer prov.avgPrefixLenLk.Unlock() - return prov.cachedAvgPrefixLen >= 0 - }, 100*time.Millisecond, "waiting for prefix len measurement") - - err = prov.StartProviding(true, h) - require.NoError(t, err) - waitUntil(t, func() bool { return provideCount.Load() == int32(len(peers)) }, 100*time.Millisecond, "waiting for StartProviding to finish") - expectedGCPCount := 1 + approxPrefixLenGCPCount + 1 // 1 for initial, approxPrefixLenGCPCount for prefix length estimation, 1 for the provide - require.Equal(t, expectedGCPCount, int(getClosestPeersCount.Load())) + require.Greater(t, prov.cachedAvgPrefixLen, 0) // TODO: FLAKY + prov.avgPrefixLenLk.Unlock() - // Verify reprovide is scheduled. - prefix := bitstr.Key(key.BitString(keyspace.MhToBit256(h))[:prefixLen]) - prov.scheduleLk.Lock() - require.Equal(t, 1, prov.schedule.Size()) - found, reprovideTime := trie.Find(prov.schedule, prefix) - if !found { - t.Log(prefix) - t.Log(keyspace.AllEntries(prov.schedule, prov.order)[0].Key) - t.Fatal("prefix not inserted in schedule") - } - require.Equal(t, prov.reprovideTimeForPrefix(prefix), reprovideTime) - prov.scheduleLk.Unlock() + err = prov.StartProviding(true, h) + require.NoError(t, err) + synctest.Wait() + require.Equal(t, int32(len(peers)), provideCount.Load()) + expectedGCPCount := 1 + approxPrefixLenGCPCount + 1 // 1 for initial, approxPrefixLenGCPCount for prefix length estimation, 1 for the provide + require.Equal(t, expectedGCPCount, int(getClosestPeersCount.Load())) + + // Verify reprovide is scheduled. + prefix := bitstr.Key(key.BitString(keyspace.MhToBit256(h))[:prefixLen]) + prov.scheduleLk.Lock() + require.Equal(t, 1, prov.schedule.Size()) + found, reprovideTime := trie.Find(prov.schedule, prefix) + if !found { + t.Log(prefix) + t.Log(keyspace.AllEntries(prov.schedule, prov.order)[0].Key) + require.FailNow(t, "prefix not inserted in schedule") + } + require.Equal(t, prov.reprovideTimeForPrefix(prefix), reprovideTime) + prov.scheduleLk.Unlock() - // Try to provide the same key again -> noop - err = prov.StartProviding(false, h) - require.NoError(t, err) - time.Sleep(5 * time.Millisecond) - require.Equal(t, int32(len(peers)), provideCount.Load()) - require.Equal(t, expectedGCPCount, int(getClosestPeersCount.Load())) - - // Verify reprovide happens as scheduled. - mockClock.Add(reprovideTime) - expectedGCPCount++ // for the reprovide - waitUntil(t, func() bool { return provideCount.Load() == 2*int32(len(peers)) }, 200*time.Millisecond, "waiting for reprovide to finish 0") - require.Equal(t, expectedGCPCount, int(getClosestPeersCount.Load())) - - mockClock.Add(reprovideInterval) - expectedGCPCount++ // for the reprovide - waitUntil(t, func() bool { return provideCount.Load() == 3*int32(len(peers)) }, 200*time.Millisecond, "waiting for reprovide to finish 1") - require.Equal(t, expectedGCPCount, int(getClosestPeersCount.Load())) - require.Equal(t, 3*int32(len(peers)), provideCount.Load()) - - mockClock.Add(reprovideInterval) - expectedGCPCount++ // for the reprovide - waitUntil(t, func() bool { return provideCount.Load() == 4*int32(len(peers)) }, 200*time.Millisecond, "waiting for reprovide to finish 2") - require.Equal(t, expectedGCPCount, int(getClosestPeersCount.Load())) + // Try to provide the same key again -> noop + err = prov.StartProviding(false, h) + require.NoError(t, err) + synctest.Wait() + require.Equal(t, int32(len(peers)), provideCount.Load()) + require.Equal(t, expectedGCPCount, int(getClosestPeersCount.Load())) + + // Verify reprovide happens as scheduled. + time.Sleep(reprovideTime) + synctest.Wait() + expectedGCPCount++ // for the reprovide + require.Equal(t, 2*int32(len(peers)), provideCount.Load()) + require.Equal(t, expectedGCPCount, int(getClosestPeersCount.Load())) + + time.Sleep(reprovideInterval) + synctest.Wait() + expectedGCPCount++ // for the reprovide + require.Equal(t, 3*int32(len(peers)), provideCount.Load()) + require.Equal(t, expectedGCPCount, int(getClosestPeersCount.Load())) + + time.Sleep(reprovideInterval) + synctest.Wait() + expectedGCPCount++ // for the reprovide + require.Equal(t, 4*int32(len(peers)), provideCount.Load()) + require.Equal(t, expectedGCPCount, int(getClosestPeersCount.Load())) + }) } const bitsPerByte = 8 func TestStartProvidingMany(t *testing.T) { - pid, err := peer.Decode("12BoooooPEER") - require.NoError(t, err) - - nKeysExponent := 10 - nKeys := 1 << nKeysExponent - mhs := genBalancedMultihashes(nKeysExponent) - - replicationFactor := 4 - peerPrefixBitlen := 6 - require.LessOrEqual(t, peerPrefixBitlen, bitsPerByte) - var nPeers byte = 1 << peerPrefixBitlen // 2**peerPrefixBitlen - peers := make([]peer.ID, nPeers) - for i := range nPeers { - b := i << (bitsPerByte - peerPrefixBitlen) - k := [32]byte{b} - peers[i], err = kb.GenRandPeerIDWithCPL(k[:], uint(peerPrefixBitlen)) + synctest.Test(t, func(t *testing.T) { + pid, err := peer.Decode("12BoooooPEER") require.NoError(t, err) - } - - mockClock := clock.NewMock() - reprovideInterval := time.Hour - router := &mockRouter{ - getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { - sortedPeers := kb.SortClosestPeers(peers, kb.ConvertKey(k)) - return sortedPeers[:min(replicationFactor, len(peers))], nil - }, - } - msgSenderLk := sync.Mutex{} - addProviderRpcs := make(map[string]map[peer.ID]int) // key -> peerid -> count - provideCount := atomic.Int32{} - msgSender := &mockMsgSender{ - sendMessageFunc: func(ctx context.Context, p peer.ID, m *pb.Message) error { - msgSenderLk.Lock() - defer msgSenderLk.Unlock() - _, k, err := mh.MHFromBytes(m.GetKey()) + nKeysExponent := 10 + nKeys := 1 << nKeysExponent + mhs := genBalancedMultihashes(nKeysExponent) + + replicationFactor := 4 + peerPrefixBitlen := 6 + require.LessOrEqual(t, peerPrefixBitlen, bitsPerByte) + var nPeers byte = 1 << peerPrefixBitlen // 2**peerPrefixBitlen + peers := make([]peer.ID, nPeers) + for i := range nPeers { + b := i << (bitsPerByte - peerPrefixBitlen) + k := [32]byte{b} + peers[i], err = kb.GenRandPeerIDWithCPL(k[:], uint(peerPrefixBitlen)) require.NoError(t, err) - if _, ok := addProviderRpcs[string(k)]; !ok { - addProviderRpcs[string(k)] = make(map[peer.ID]int) - } - addProviderRpcs[string(k)][p]++ - provideCount.Add(1) - return nil - }, - } - opts := []Option{ - WithReprovideInterval(reprovideInterval), - WithReplicationFactor(replicationFactor), - WithMaxWorkers(1), - WithDedicatedBurstWorkers(0), - WithDedicatedPeriodicWorkers(0), - WithPeerID(pid), - WithRouter(router), - WithMessageSender(msgSender), - WithSelfAddrs(func() []ma.Multiaddr { - addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") - require.NoError(t, err) - return []ma.Multiaddr{addr} - }), - WithClock(mockClock), - } - prov, err := New(opts...) - require.NoError(t, err) - defer prov.Close() - waitUntil(t, prov.connectivity.IsOnline, 100*time.Millisecond, "connectivity should be online") + } - err = prov.StartProviding(true, mhs...) - require.NoError(t, err) - waitUntil(t, func() bool { return provideCount.Load() == int32(len(mhs)*replicationFactor) }, 100*time.Millisecond, "waiting for ProvideMany to finish") + reprovideInterval := time.Hour - // Each key should have been provided at least once. - msgSenderLk.Lock() - require.Equal(t, nKeys, len(addProviderRpcs)) - for k, holders := range addProviderRpcs { - // Verify that all keys have been provided to exactly replicationFactor - // distinct peers. - require.Len(t, holders, replicationFactor) - for _, count := range holders { - require.Equal(t, 1, count) + router := &mockRouter{ + getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { + sortedPeers := kb.SortClosestPeers(peers, kb.ConvertKey(k)) + return sortedPeers[:min(replicationFactor, len(peers))], nil + }, } - // Verify provider records are assigned to the closest peers - closestPeers := kb.SortClosestPeers(peers, kb.ConvertKey(k))[:replicationFactor] - for _, p := range closestPeers { - require.Contains(t, holders, p) + msgSenderLk := sync.Mutex{} + addProviderRpcs := make(map[string]map[peer.ID]int) // key -> peerid -> count + provideCount := atomic.Int32{} + msgSender := &mockMsgSender{ + sendMessageFunc: func(ctx context.Context, p peer.ID, m *pb.Message) error { + msgSenderLk.Lock() + defer msgSenderLk.Unlock() + _, k, err := mh.MHFromBytes(m.GetKey()) + require.NoError(t, err) + if _, ok := addProviderRpcs[string(k)]; !ok { + addProviderRpcs[string(k)] = make(map[peer.ID]int) + } + addProviderRpcs[string(k)][p]++ + provideCount.Add(1) + return nil + }, } - } + opts := []Option{ + WithReprovideInterval(reprovideInterval), + WithReplicationFactor(replicationFactor), + WithMaxWorkers(1), + WithDedicatedBurstWorkers(0), + WithDedicatedPeriodicWorkers(0), + WithPeerID(pid), + WithRouter(router), + WithMessageSender(msgSender), + WithSelfAddrs(func() []ma.Multiaddr { + addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") + require.NoError(t, err) + return []ma.Multiaddr{addr} + }), + } + prov, err := New(opts...) + require.NoError(t, err) + defer prov.Close() + synctest.Wait() + require.True(t, prov.connectivity.IsOnline()) - step := 10 * time.Second - // Test reprovides, clear addProviderRpcs - clear(addProviderRpcs) - msgSenderLk.Unlock() - for range reprovideInterval / step { - mockClock.Add(step) - } - waitUntil(t, func() bool { return provideCount.Load() == 2*int32(len(mhs)*replicationFactor) }, 200*time.Millisecond, "waiting for reprovide to finish 0") + err = prov.StartProviding(true, mhs...) + require.NoError(t, err) + synctest.Wait() + require.Equal(t, int32(len(mhs)*replicationFactor), provideCount.Load()) - msgSenderLk.Lock() - require.Equal(t, nKeys, len(addProviderRpcs)) - for k, holders := range addProviderRpcs { - // Verify that all keys have been provided to exactly replicationFactor - // distinct peers. - require.Len(t, holders, replicationFactor, key.BitString(keyspace.MhToBit256([]byte(k)))) - for _, count := range holders { - require.Equal(t, 1, count) + // Each key should have been provided at least once. + msgSenderLk.Lock() + require.Equal(t, nKeys, len(addProviderRpcs)) + for k, holders := range addProviderRpcs { + // Verify that all keys have been provided to exactly replicationFactor + // distinct peers. + require.Len(t, holders, replicationFactor) + for _, count := range holders { + require.Equal(t, 1, count) + } + // Verify provider records are assigned to the closest peers + closestPeers := kb.SortClosestPeers(peers, kb.ConvertKey(k))[:replicationFactor] + for _, p := range closestPeers { + require.Contains(t, holders, p) + } } - // Verify provider records are assigned to the closest peers - closestPeers := kb.SortClosestPeers(peers, kb.ConvertKey(k))[:replicationFactor] - for _, p := range closestPeers { - require.Contains(t, holders, p) + + step := 10 * time.Second + // Test reprovides, clear addProviderRpcs + clear(addProviderRpcs) + msgSenderLk.Unlock() + for range reprovideInterval / step { + time.Sleep(step) } - } + synctest.Wait() + require.Equal(t, 2*int32(len(mhs)*replicationFactor), provideCount.Load(), "should have reprovided all keys at least once") - // Test reprovides again, clear addProviderRpcs - clear(addProviderRpcs) - msgSenderLk.Unlock() - for range reprovideInterval / step { - mockClock.Add(step) - } - waitUntil(t, func() bool { return provideCount.Load() == 3*int32(len(mhs)*replicationFactor) }, 200*time.Millisecond, "waiting for reprovide to finish 1") + msgSenderLk.Lock() + require.Equal(t, nKeys, len(addProviderRpcs)) + for k, holders := range addProviderRpcs { + // Verify that all keys have been provided to exactly replicationFactor + // distinct peers. + require.Len(t, holders, replicationFactor, key.BitString(keyspace.MhToBit256([]byte(k)))) + for _, count := range holders { + require.Equal(t, 1, count) + } + // Verify provider records are assigned to the closest peers + closestPeers := kb.SortClosestPeers(peers, kb.ConvertKey(k))[:replicationFactor] + for _, p := range closestPeers { + require.Contains(t, holders, p) + } + } - msgSenderLk.Lock() - require.Equal(t, nKeys, len(addProviderRpcs)) - for k, holders := range addProviderRpcs { - // Verify that all keys have been provided to exactly replicationFactor - // distinct peers. - require.Len(t, holders, replicationFactor) - for _, count := range holders { - require.Equal(t, 1, count) + // Test reprovides again, clear addProviderRpcs + clear(addProviderRpcs) + msgSenderLk.Unlock() + for range reprovideInterval / step { + time.Sleep(step) } - // Verify provider records are assigned to the closest peers - closestPeers := kb.SortClosestPeers(peers, kb.ConvertKey(k))[:replicationFactor] - for _, p := range closestPeers { - require.Contains(t, holders, p) + synctest.Wait() + require.Equal(t, 3*int32(len(mhs)*replicationFactor), provideCount.Load(), "should have reprovided all keys at least twice") + + msgSenderLk.Lock() + require.Equal(t, nKeys, len(addProviderRpcs)) + for k, holders := range addProviderRpcs { + // Verify that all keys have been provided to exactly replicationFactor + // distinct peers. + require.Len(t, holders, replicationFactor) + for _, count := range holders { + require.Equal(t, 1, count) + } + // Verify provider records are assigned to the closest peers + closestPeers := kb.SortClosestPeers(peers, kb.ConvertKey(k))[:replicationFactor] + for _, p := range closestPeers { + require.Contains(t, holders, p) + } } - } - msgSenderLk.Unlock() + msgSenderLk.Unlock() + }) } func TestStartProvidingUnstableNetwork(t *testing.T) { - pid, err := peer.Decode("12BoooooPEER") - require.NoError(t, err) - - nKeysExponent := 10 - nKeys := 1 << nKeysExponent - mhs := genBalancedMultihashes(nKeysExponent) - - replicationFactor := 4 - peerPrefixBitlen := 6 - require.LessOrEqual(t, peerPrefixBitlen, bitsPerByte) - var nPeers byte = 1 << peerPrefixBitlen // 2**peerPrefixBitlen - peers := make([]peer.ID, nPeers) - for i := range nPeers { - b := i << (bitsPerByte - peerPrefixBitlen) - k := [32]byte{b} - peers[i], err = kb.GenRandPeerIDWithCPL(k[:], uint(peerPrefixBitlen)) + synctest.Test(t, func(t *testing.T) { + pid, err := peer.Decode("12BoooooPEER") require.NoError(t, err) - } - mockClock := clock.NewMock() - reprovideInterval := time.Hour - connectivityCheckInterval := time.Second - offlineDelay := time.Minute - - routerOffline := atomic.Bool{} - router := &mockRouter{ - getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { - if routerOffline.Load() { - return nil, errors.New("offline") - } - sortedPeers := kb.SortClosestPeers(peers, kb.ConvertKey(k)) - return sortedPeers[:min(replicationFactor, len(peers))], nil - }, - } - msgSenderLk := sync.Mutex{} - addProviderRpcs := make(map[string]map[peer.ID]int) // key -> peerid -> count - provideCount := atomic.Int32{} - msgSender := &mockMsgSender{ - sendMessageFunc: func(ctx context.Context, p peer.ID, m *pb.Message) error { - msgSenderLk.Lock() - defer msgSenderLk.Unlock() - if routerOffline.Load() { - return errors.New("offline") - } - _, k, err := mh.MHFromBytes(m.GetKey()) + nKeysExponent := 10 + nKeys := 1 << nKeysExponent + mhs := genBalancedMultihashes(nKeysExponent) + + replicationFactor := 4 + peerPrefixBitlen := 6 + require.LessOrEqual(t, peerPrefixBitlen, bitsPerByte) + var nPeers byte = 1 << peerPrefixBitlen // 2**peerPrefixBitlen + peers := make([]peer.ID, nPeers) + for i := range nPeers { + b := i << (bitsPerByte - peerPrefixBitlen) + k := [32]byte{b} + peers[i], err = kb.GenRandPeerIDWithCPL(k[:], uint(peerPrefixBitlen)) require.NoError(t, err) - if _, ok := addProviderRpcs[string(k)]; !ok { - addProviderRpcs[string(k)] = make(map[peer.ID]int) - } - addProviderRpcs[string(k)][p]++ - provideCount.Add(1) - return nil - }, - } - opts := []Option{ - WithReprovideInterval(reprovideInterval), - WithReplicationFactor(replicationFactor), - WithMaxWorkers(1), - WithDedicatedBurstWorkers(0), - WithDedicatedPeriodicWorkers(0), - WithPeerID(pid), - WithRouter(router), - WithMessageSender(msgSender), - WithSelfAddrs(func() []ma.Multiaddr { - addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") - require.NoError(t, err) - return []ma.Multiaddr{addr} - }), - WithClock(mockClock), - WithOfflineDelay(offlineDelay), - WithConnectivityCheckOnlineInterval(connectivityCheckInterval), - } - prov, err := New(opts...) - require.NoError(t, err) - defer prov.Close() + } - waitUntil(t, func() bool { + reprovideInterval := time.Hour + connectivityCheckInterval := time.Minute + offlineDelay := time.Hour + + routerOffline := atomic.Bool{} + router := &mockRouter{ + getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { + if routerOffline.Load() { + return nil, errors.New("offline") + } + sortedPeers := kb.SortClosestPeers(peers, kb.ConvertKey(k)) + return sortedPeers[:min(replicationFactor, len(peers))], nil + }, + } + msgSenderLk := sync.Mutex{} + addProviderRpcs := make(map[string]map[peer.ID]int) // key -> peerid -> count + provideCount := atomic.Int32{} + msgSender := &mockMsgSender{ + sendMessageFunc: func(ctx context.Context, p peer.ID, m *pb.Message) error { + msgSenderLk.Lock() + defer msgSenderLk.Unlock() + if routerOffline.Load() { + return errors.New("offline") + } + _, k, err := mh.MHFromBytes(m.GetKey()) + require.NoError(t, err) + if _, ok := addProviderRpcs[string(k)]; !ok { + addProviderRpcs[string(k)] = make(map[peer.ID]int) + } + addProviderRpcs[string(k)][p]++ + provideCount.Add(1) + return nil + }, + } + opts := []Option{ + WithReprovideInterval(reprovideInterval), + WithReplicationFactor(replicationFactor), + WithMaxWorkers(1), + WithDedicatedBurstWorkers(0), + WithDedicatedPeriodicWorkers(0), + WithPeerID(pid), + WithRouter(router), + WithMessageSender(msgSender), + WithSelfAddrs(func() []ma.Multiaddr { + addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") + require.NoError(t, err) + return []ma.Multiaddr{addr} + }), + WithOfflineDelay(offlineDelay), + WithConnectivityCheckOnlineInterval(connectivityCheckInterval), + } + prov, err := New(opts...) + require.NoError(t, err) + defer prov.Close() + + synctest.Wait() prov.avgPrefixLenLk.Lock() - defer prov.avgPrefixLenLk.Unlock() - return prov.cachedAvgPrefixLen >= 0 - }, 100*time.Millisecond, "waiting for initial average prefix length to be set") - routerOffline.Store(true) - mockClock.Add(connectivityCheckInterval) + require.Greater(t, prov.cachedAvgPrefixLen, 0) + prov.avgPrefixLenLk.Unlock() - err = prov.StartProviding(true, mhs...) - require.NoError(t, err) - time.Sleep(10 * time.Millisecond) // wait for StartProviding to finish - require.Equal(t, int32(0), provideCount.Load(), "should not have provided when disconnected") + routerOffline.Store(true) + time.Sleep(connectivityCheckInterval) // wait for connectivity check to become available again + synctest.Wait() - nodeOffline := func() bool { - return !prov.connectivity.IsOnline() - } - waitUntil(t, nodeOffline, 100*time.Millisecond, "waiting for node to be disconnected") - mockClock.Add(connectivityCheckInterval) + err = prov.StartProviding(true, mhs...) + require.NoError(t, err) + synctest.Wait() + require.Equal(t, int32(0), provideCount.Load(), "should not have provided when disconnected") + require.False(t, prov.connectivity.IsOnline()) - routerOffline.Store(false) - mockClock.Add(connectivityCheckInterval) - waitUntil(t, prov.connectivity.IsOnline, 100*time.Millisecond, "waiting for node to come back online") + routerOffline.Store(false) + time.Sleep(connectivityCheckInterval) // connectivity check triggered + synctest.Wait() + require.True(t, prov.connectivity.IsOnline()) - providedAllKeys := func() bool { msgSenderLk.Lock() - defer msgSenderLk.Unlock() - if len(addProviderRpcs) != nKeys { - return false - } + require.Equal(t, nKeys, len(addProviderRpcs)) for _, peers := range addProviderRpcs { // Verify that all keys have been provided to exactly replicationFactor // distinct peers. - if len(peers) != replicationFactor { - return false - } + require.Len(t, peers, replicationFactor) } - return true - } - waitUntil(t, providedAllKeys, 200*time.Millisecond, "waiting for all keys to be reprovided") + msgSenderLk.Unlock() + }) } func TestAddToSchedule(t *testing.T) { - mockClock := clock.NewMock() prov := SweepingProvider{ - clock: mockClock, reprovideInterval: time.Hour, schedule: trie.New[bitstr.Key, time.Duration](), - scheduleTimer: mockClock.Timer(time.Hour), + scheduleTimer: time.NewTimer(time.Hour), cachedAvgPrefixLen: 4, avgPrefixLenValidity: time.Minute, - lastAvgPrefixLen: mockClock.Now(), + lastAvgPrefixLen: time.Now(), } ok, _ := trie.Find(prov.schedule, "0000") @@ -1250,18 +1248,16 @@ func TestRefreshSchedule(t *testing.T) { keyStore, err := datastore.NewKeyStore(mapDs) require.NoError(t, err) - mockClock := clock.NewMock() prov := SweepingProvider{ keyStore: keyStore, - clock: mockClock, reprovideInterval: time.Hour, schedule: trie.New[bitstr.Key, time.Duration](), - scheduleTimer: mockClock.Timer(time.Hour), + scheduleTimer: time.NewTimer(time.Hour), cachedAvgPrefixLen: 4, avgPrefixLenValidity: time.Minute, - lastAvgPrefixLen: mockClock.Now(), + lastAvgPrefixLen: time.Now(), } // Schedule is empty @@ -1306,121 +1302,123 @@ func TestRefreshSchedule(t *testing.T) { } func TestOperationsOffline(t *testing.T) { - pid, err := peer.Decode("12BoooooPEER") - require.NoError(t, err) + synctest.Test(t, func(t *testing.T) { + pid, err := peer.Decode("12BoooooPEER") + require.NoError(t, err) - mockClock := clock.NewMock() - checkInterval := time.Second - offlineDelay := time.Minute + checkInterval := time.Second + offlineDelay := time.Minute - online := atomic.Bool{} // false, start offline + online := atomic.Bool{} // false, start offline - router := &mockRouter{ - getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { - if online.Load() { - return []peer.ID{pid}, nil - } - return nil, errors.New("offline") - }, - } - opts := []Option{ - WithReprovideInterval(time.Hour), - WithReplicationFactor(1), - WithMaxWorkers(1), - WithDedicatedBurstWorkers(0), - WithDedicatedPeriodicWorkers(0), - WithPeerID(pid), - WithRouter(router), - WithMessageSender(&mockMsgSender{}), - WithSelfAddrs(func() []ma.Multiaddr { - addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") - require.NoError(t, err) - return []ma.Multiaddr{addr} - }), - WithClock(mockClock), - WithOfflineDelay(offlineDelay), - WithConnectivityCheckOnlineInterval(checkInterval), - } - prov, err := New(opts...) - require.NoError(t, err) - defer prov.Close() - - k := random.Multihashes(1)[0] - - // Not bootstrapped yet, OFFLINE - err = prov.ProvideOnce(k) - require.ErrorIs(t, err, ErrOffline) - err = prov.StartProviding(false, k) - require.ErrorIs(t, err, ErrOffline) - err = prov.StartProviding(true, k) - require.ErrorIs(t, err, ErrOffline) - err = prov.RefreshSchedule() - require.ErrorIs(t, err, ErrOffline) - err = prov.AddToSchedule(k) - require.ErrorIs(t, err, ErrOffline) - err = prov.StopProviding(k) // no error for StopProviding - require.NoError(t, err) + router := &mockRouter{ + getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { + if online.Load() { + return []peer.ID{pid}, nil + } + return nil, errors.New("offline") + }, + } + opts := []Option{ + WithReprovideInterval(time.Hour), + WithReplicationFactor(1), + WithMaxWorkers(1), + WithDedicatedBurstWorkers(0), + WithDedicatedPeriodicWorkers(0), + WithPeerID(pid), + WithRouter(router), + WithMessageSender(&mockMsgSender{}), + WithSelfAddrs(func() []ma.Multiaddr { + addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") + require.NoError(t, err) + return []ma.Multiaddr{addr} + }), + WithOfflineDelay(offlineDelay), + WithConnectivityCheckOnlineInterval(checkInterval), + } + prov, err := New(opts...) + require.NoError(t, err) + defer prov.Close() + + k := random.Multihashes(1)[0] + + // Not bootstrapped yet, OFFLINE + err = prov.ProvideOnce(k) + require.ErrorIs(t, err, ErrOffline) + err = prov.StartProviding(false, k) + require.ErrorIs(t, err, ErrOffline) + err = prov.StartProviding(true, k) + require.ErrorIs(t, err, ErrOffline) + err = prov.RefreshSchedule() + require.ErrorIs(t, err, ErrOffline) + err = prov.AddToSchedule(k) + require.ErrorIs(t, err, ErrOffline) + err = prov.StopProviding(k) // no error for StopProviding + require.NoError(t, err) - online.Store(true) - mockClock.Add(checkInterval) // trigger connectivity check - waitUntil(t, prov.connectivity.IsOnline, 100*time.Millisecond, "connectivity should be online") + online.Store(true) + time.Sleep(checkInterval) // trigger connectivity check + synctest.Wait() + require.True(t, prov.connectivity.IsOnline()) - // ONLINE, operations shouldn't error - err = prov.ProvideOnce(k) - require.NoError(t, err) - err = prov.StartProviding(false, k) - require.NoError(t, err) - err = prov.StartProviding(true, k) - require.NoError(t, err) - err = prov.RefreshSchedule() - require.NoError(t, err) - err = prov.AddToSchedule(k) - require.NoError(t, err) - err = prov.StopProviding(k) // no error for StopProviding - require.NoError(t, err) + // ONLINE, operations shouldn't error + err = prov.ProvideOnce(k) + require.NoError(t, err) + err = prov.StartProviding(false, k) + require.NoError(t, err) + err = prov.StartProviding(true, k) + require.NoError(t, err) + err = prov.RefreshSchedule() + require.NoError(t, err) + err = prov.AddToSchedule(k) + require.NoError(t, err) + err = prov.StopProviding(k) // no error for StopProviding + require.NoError(t, err) - online.Store(false) - mockClock.Add(checkInterval) // trigger connectivity check - prov.connectivity.TriggerCheck() - waitUntil(t, func() bool { return !prov.connectivity.IsOnline() }, 100*time.Millisecond, "connectivity should be offline") + online.Store(false) + time.Sleep(checkInterval) // wait for connectivity check to finish + prov.connectivity.TriggerCheck() + synctest.Wait() + require.False(t, prov.connectivity.IsOnline()) - // DISCONNECTED, operations shoudln't error until node is OFFLINE - err = prov.ProvideOnce(k) - require.NoError(t, err) - err = prov.StartProviding(false, k) - require.NoError(t, err) - err = prov.StartProviding(true, k) - require.NoError(t, err) - err = prov.RefreshSchedule() - require.NoError(t, err) - err = prov.AddToSchedule(k) - require.NoError(t, err) - err = prov.StopProviding(k) // no error for StopProviding - require.NoError(t, err) + // DISCONNECTED, operations shoudln't error until node is OFFLINE + err = prov.ProvideOnce(k) + require.NoError(t, err) + err = prov.StartProviding(false, k) + require.NoError(t, err) + err = prov.StartProviding(true, k) + require.NoError(t, err) + err = prov.RefreshSchedule() + require.NoError(t, err) + err = prov.AddToSchedule(k) + require.NoError(t, err) + err = prov.StopProviding(k) // no error for StopProviding + require.NoError(t, err) - prov.provideQueue.Enqueue("0000", k) - require.Equal(t, 1, prov.provideQueue.Size()) - mockClock.Add(offlineDelay) - time.Sleep(5 * time.Millisecond) // wait for connectivity check to finish - - // OFFLINE - // Verify that provide queue has been emptied by the onOffline callback - require.True(t, prov.provideQueue.IsEmpty()) - prov.avgPrefixLenLk.Lock() - require.Equal(t, -1, prov.cachedAvgPrefixLen) - prov.avgPrefixLenLk.Unlock() - - // All operations should error again - err = prov.ProvideOnce(k) - require.ErrorIs(t, err, ErrOffline) - err = prov.StartProviding(false, k) - require.ErrorIs(t, err, ErrOffline) - err = prov.StartProviding(true, k) - require.ErrorIs(t, err, ErrOffline) - err = prov.RefreshSchedule() - require.ErrorIs(t, err, ErrOffline) - err = prov.AddToSchedule(k) - require.ErrorIs(t, err, ErrOffline) - err = prov.StopProviding(k) // no error for StopProviding - require.NoError(t, err) + prov.provideQueue.Enqueue("0000", k) + require.Equal(t, 1, prov.provideQueue.Size()) + time.Sleep(offlineDelay) + synctest.Wait() + + // OFFLINE + // Verify that provide queue has been emptied by the onOffline callback + require.True(t, prov.provideQueue.IsEmpty()) + prov.avgPrefixLenLk.Lock() + require.Equal(t, -1, prov.cachedAvgPrefixLen) + prov.avgPrefixLenLk.Unlock() + + // All operations should error again + err = prov.ProvideOnce(k) + require.ErrorIs(t, err, ErrOffline) + err = prov.StartProviding(false, k) + require.ErrorIs(t, err, ErrOffline) + err = prov.StartProviding(true, k) + require.ErrorIs(t, err, ErrOffline) + err = prov.RefreshSchedule() + require.ErrorIs(t, err, ErrOffline) + err = prov.AddToSchedule(k) + require.ErrorIs(t, err, ErrOffline) + err = prov.StopProviding(k) // no error for StopProviding + require.NoError(t, err) + }) }