From 02d60e017f98eae7ae3ec53daad368c5e51d817b Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Tue, 26 Aug 2025 22:24:54 +0200 Subject: [PATCH] docs and tests --- dual/provider/options.go | 2 +- dual/provider/provider.go | 20 ++- .../internal/connectivity/connectivity.go | 87 +++++++---- .../connectivity/connectivity_test.go | 22 +-- provider/internal/connectivity/options.go | 18 +-- provider/options.go | 22 +-- provider/provider.go | 39 ++++- provider/provider_test.go | 135 ++++++++++++++++-- 8 files changed, 248 insertions(+), 97 deletions(-) diff --git a/dual/provider/options.go b/dual/provider/options.go index 8730e80b6..f643ded12 100644 --- a/dual/provider/options.go +++ b/dual/provider/options.go @@ -76,8 +76,8 @@ var DefaultConfig = func(cfg *config) error { cfg.reprovideInterval = [2]time.Duration{amino.DefaultReprovideInterval, amino.DefaultReprovideInterval} cfg.maxReprovideDelay = [2]time.Duration{provider.DefaultMaxReprovideDelay, provider.DefaultMaxReprovideDelay} + cfg.offlineDelay = [2]time.Duration{provider.DefaultOfflineDelay, provider.DefaultOfflineDelay} cfg.connectivityCheckOnlineInterval = [2]time.Duration{provider.DefaultConnectivityCheckOnlineInterval, provider.DefaultConnectivityCheckOnlineInterval} - cfg.connectivityCheckOfflineInterval = [2]time.Duration{provider.DefaultConnectivityCheckOfflineInterval, provider.DefaultConnectivityCheckOfflineInterval} cfg.maxWorkers = [2]int{4, 4} cfg.dedicatedPeriodicWorkers = [2]int{2, 2} diff --git a/dual/provider/provider.go b/dual/provider/provider.go index 05a3b9f8b..661a8adbc 100644 --- a/dual/provider/provider.go +++ b/dual/provider/provider.go @@ -59,7 +59,6 @@ func New(d *dual.DHT, opts ...Option) (*SweepingProvider, error) { provider.WithMaxReprovideDelay(cfg.maxReprovideDelay[i]), provider.WithOfflineDelay(cfg.offlineDelay[i]), provider.WithConnectivityCheckOnlineInterval(cfg.connectivityCheckOnlineInterval[i]), - provider.WithConnectivityCheckOfflineInterval(cfg.connectivityCheckOfflineInterval[i]), provider.WithMaxWorkers(cfg.maxWorkers[i]), provider.WithDedicatedPeriodicWorkers(cfg.dedicatedPeriodicWorkers[i]), provider.WithDedicatedBurstWorkers(cfg.dedicatedBurstWorkers[i]), @@ -102,8 +101,14 @@ func (s *SweepingProvider) runOnBoth(f func(*provider.SweepingProvider) error) e // ProvideOnce sends provider records for the specified keys to both DHT swarms // only once. It does not automatically reprovide those keys afterward. // -// Add the supplied multihashes to the provide queue, and return immediately. +// Add the supplied multihashes to the provide queues, and return right after. // The provide operation happens asynchronously. +// +// Returns an error if the keys couldn't be added to the provide queue. This +// can happen if the provider is closed or if the node is currently Offline +// (either never bootstrapped, or disconnected since more than `OfflineDelay`). +// The schedule and provide queue depend on the network size, hence recent +// network connectivity is essential. func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) error { return s.runOnBoth(func(p *provider.SweepingProvider) error { return p.ProvideOnce(keys...) @@ -121,6 +126,12 @@ func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) error { // // This operation is asynchronous, it returns as soon as the `keys` are added // to the provide queue, and provides happens asynchronously. +// +// Returns an error if the keys couldn't be added to the provide queue. This +// can happen if the provider is closed or if the node is currently Offline +// (either never bootstrapped, or disconnected since more than `OfflineDelay`). +// The schedule and provide queue depend on the network size, hence recent +// network connectivity is essential. func (s *SweepingProvider) StartProviding(force bool, keys ...mh.Multihash) error { ctx := context.Background() newKeys, err := s.keyStore.Put(ctx, keys...) @@ -173,6 +184,11 @@ func (s *SweepingProvider) Clear() int { // This function doesn't remove prefixes that have no keys from the schedule. // This is done automatically during the reprovide operation if a region has no // keys. +// +// Returns an error if the provider is closed or if the node is currently +// Offline (either never bootstrapped, or disconnected since more than +// `OfflineDelay`). The schedule depends on the network size, hence recent +// network connectivity is essential. func (s *SweepingProvider) RefreshSchedule() error { return s.runOnBoth(func(p *provider.SweepingProvider) error { return p.RefreshSchedule() diff --git a/provider/internal/connectivity/connectivity.go b/provider/internal/connectivity/connectivity.go index d9e8c86b5..63391c73d 100644 --- a/provider/internal/connectivity/connectivity.go +++ b/provider/internal/connectivity/connectivity.go @@ -8,19 +8,33 @@ import ( "github.com/filecoin-project/go-clock" ) +const ( + initialBackoffDelay = 100 * time.Millisecond + maxBackoffDelay = time.Minute +) + // ConnectivityChecker provides a thread-safe way to verify the connectivity of -// a node, and triggers a wake-up callback when the node comes back online -// after a period offline. +// a node, and triggers wake-up callbacks when the node changes connectivity +// state. The `checkFunc` callback used to verify network connectivity is user +// supplied. // -// Key behaviors: -// - Connectivity check: external function `checkFunc` supplied by caller. -// - Online handling: only run connectivity check upon call of triggerCheck() -// if at least `onlineCheckInterval` has passed since the last check. -// - Offline handling: while offline, triggerCheck() is ignored. -// – A background loop runs `checkFunc` every `offlineCheckInterval` until -// connectivity is restored. -// – Once back online, the node’s status is updated and `backOnlineNotify` -// is invoked exactly once. +// State Machine starting in OFFLINE state (when `Start()` is called) +// 1. OFFLINE state: +// - Calls `checkFunc` with exponential backoff until node is found ONLINE. +// - Calls to `TriggerCheck()` are ignored while OFFLINE. +// - When `checkFunc` returns true, state changes to ONLINE and +// `onOnline()` callback is called. +// 2. ONLINE state: +// - Calls to `TriggerCheck()` will call `checkFunc` only if at least +// `onlineCheckInterval` has passed since the last check. +// - If `TriggerCheck()` returns false, switch state to DISCONNECTED. +// 3. DISCONNECTED state: +// - Calls `checkFunc` with exponential backoff until node is found ONLINE. +// - Calls to `TriggerCheck()` are ignored while DISCONNECTED. +// - When `checkFunc` returns true, state changes to ONLINE and +// `onOnline()` callback is called. +// - After `offlineDelay` has passed in DISCONNECTED state, state changes +// to OFFLINE and `onOffline()` callback is called. type ConnectivityChecker struct { done chan struct{} closed bool @@ -29,10 +43,9 @@ type ConnectivityChecker struct { online atomic.Bool - clock clock.Clock - lastCheck time.Time - onlineCheckInterval time.Duration // minimum check interval when online - offlineCheckInterval time.Duration // periodic check frequency when offline + clock clock.Clock + lastCheck time.Time + onlineCheckInterval time.Duration // minimum check interval when online checkFunc func() bool // function to check whether node is online @@ -49,20 +62,21 @@ func New(checkFunc func() bool, opts ...Option) (*ConnectivityChecker, error) { return nil, err } c := &ConnectivityChecker{ - done: make(chan struct{}), - checkFunc: checkFunc, - clock: cfg.clock, - onlineCheckInterval: cfg.onlineCheckInterval, - offlineCheckInterval: cfg.offlineCheckInterval, - onOffline: cfg.onOffline, - onOnline: cfg.onOnline, - offlineDelay: cfg.offlineDelay, + done: make(chan struct{}), + checkFunc: checkFunc, + clock: cfg.clock, + onlineCheckInterval: cfg.onlineCheckInterval, + onOffline: cfg.onOffline, + onOnline: cfg.onOnline, + offlineDelay: cfg.offlineDelay, } return c, nil } // SetCallbacks sets the onOnline and onOffline callbacks after construction. // This allows breaking circular dependencies during initialization. +// +// SetCallbacks must be called before Start(). func (c *ConnectivityChecker) SetCallbacks(onOnline, onOffline func()) { c.mutex.Lock() defer c.mutex.Unlock() @@ -73,6 +87,10 @@ func (c *ConnectivityChecker) SetCallbacks(onOnline, onOffline func()) { c.onOffline = onOffline } +// Start the ConnectivityChecker in Offline state, by begining connectivity +// probes, until the node is found Online. +// +// If SetCallbacks() is used, Start() must be called after SetCallbacks(). func (c *ConnectivityChecker) Start() { c.mutex.Lock() // Start probing until the node comes online @@ -115,7 +133,7 @@ func (c *ConnectivityChecker) IsOnline() bool { // - When node is found back online, run the `backOnlineNotify` callback. func (c *ConnectivityChecker) TriggerCheck() { if !c.mutex.TryLock() { - return // already checking + return // check already in progress } if c.closed { c.mutex.Unlock() @@ -134,13 +152,16 @@ func (c *ConnectivityChecker) TriggerCheck() { return } - // Node is disconnected, start periodic checks + // Online -> Disconnected c.online.Store(false) + // Start periodic checks until node comes back Online c.probeLoop(false) }() } +// probeLoop runs connectivity probes with exponential backoff until the node +// comes back Online, or the ConnectivityChecker is closed. func (c *ConnectivityChecker) probeLoop(init bool) { var offlineC <-chan time.Time if !init { @@ -149,19 +170,21 @@ func (c *ConnectivityChecker) probeLoop(init bool) { offlineC = offlineTimer.C } - ticker := c.clock.Ticker(c.offlineCheckInterval) - // TODO: exponential backoff - defer ticker.Stop() + delay := initialBackoffDelay + timer := c.clock.Timer(delay) + defer timer.Stop() for { select { case <-c.done: return - case <-ticker.C: + case <-timer.C: if c.probe() { return } + delay = min(2*delay, maxBackoffDelay) + timer.Reset(delay) case <-offlineC: - // Node is now offline + // Disconnected -> Offline if c.onOffline != nil { c.onOffline() } @@ -169,12 +192,14 @@ func (c *ConnectivityChecker) probeLoop(init bool) { } } +// probe runs the connectivity check function once, and if the node is found +// Online, updates the state and runs the onOnline callback. func (c *ConnectivityChecker) probe() bool { if c.checkFunc() { select { case <-c.done: default: - // Node is back online. + // Node is back Online. c.online.Store(true) c.lastCheck = c.clock.Now() diff --git a/provider/internal/connectivity/connectivity_test.go b/provider/internal/connectivity/connectivity_test.go index 00b2277c4..c7417a1a4 100644 --- a/provider/internal/connectivity/connectivity_test.go +++ b/provider/internal/connectivity/connectivity_test.go @@ -59,7 +59,6 @@ func TestConnectivityChecker_New(t *testing.T) { checker, err := New(checkFunc, WithClock(mockClock), WithOnlineCheckInterval(30*time.Second), - WithOfflineCheckInterval(10*time.Second), ) require.NoError(t, err) defer checker.Close() @@ -75,11 +74,6 @@ func TestConnectivityChecker_New(t *testing.T) { ) assert.Error(t, err) - _, err = New(checkFunc, - WithOfflineCheckInterval(0), - ) - assert.Error(t, err) - _, err = New(checkFunc, WithOfflineDelay(-1*time.Second), ) @@ -173,8 +167,7 @@ func TestConnectivityChecker_TriggerCheck_WithMockClock(t *testing.T) { checker, err := New(checkFunc, WithOnlineCheckInterval(10*time.Millisecond), // Very short interval - WithOfflineCheckInterval(20*time.Millisecond), - WithOfflineDelay(100*time.Millisecond), // Short delay for testing + WithOfflineDelay(100*time.Millisecond), // Short delay for testing WithOnOnline(onlineNotify), WithOnOffline(offlineNotify), ) @@ -220,7 +213,6 @@ func TestConnectivityChecker_TriggerCheck_WithMockClock(t *testing.T) { checker, err := New(checkFunc, WithClock(mockClock), - WithOfflineCheckInterval(30*time.Second), WithOnOnline(onlineNotify), ) require.NoError(t, err) @@ -305,7 +297,6 @@ func TestConnectivityChecker_TriggerCheck_WithMockClock(t *testing.T) { checker, err := New(checkFunc, WithClock(mockClock), - WithOfflineCheckInterval(30*time.Second), WithOnOnline(onlineNotify), ) require.NoError(t, err) @@ -350,7 +341,6 @@ func TestConnectivityChecker_StateTransitions(t *testing.T) { checker, err := New(checkFunc, WithOnlineCheckInterval(10*time.Millisecond), // Very short interval to allow quick trigger WithOfflineDelay(100*time.Millisecond), // Short delay for testing - WithOfflineCheckInterval(20*time.Millisecond), ) require.NoError(t, err) checker.Start() @@ -393,8 +383,7 @@ func TestConnectivityChecker_Callbacks(t *testing.T) { checker, err := New(checkFunc, WithOnlineCheckInterval(10*time.Millisecond), // Very short interval - WithOfflineCheckInterval(20*time.Millisecond), - WithOfflineDelay(100*time.Millisecond), // Short delay for testing + WithOfflineDelay(100*time.Millisecond), // Short delay for testing WithOnOnline(onlineNotify), WithOnOffline(offlineNotify), ) @@ -448,7 +437,6 @@ func TestConnectivityChecker_Callbacks(t *testing.T) { checker, err := New(checkFunc, WithOnOnline(safeCallback), WithOnOffline(safeCallback), - WithOfflineCheckInterval(10*time.Millisecond), WithOfflineDelay(50*time.Millisecond), ) require.NoError(t, err) @@ -478,9 +466,7 @@ func TestConnectivityChecker_EdgeCases(t *testing.T) { return false } - checker, err := New(checkFunc, - WithOfflineCheckInterval(10*time.Millisecond), - ) + checker, err := New(checkFunc) require.NoError(t, err) checker.Start() @@ -578,7 +564,6 @@ func TestConnectivityChecker_Options(t *testing.T) { checker, err := New(checkFunc, WithOnlineCheckInterval(10*time.Millisecond), // Very short interval WithOfflineDelay(100*time.Millisecond), // Short delay for testing - WithOfflineCheckInterval(20*time.Millisecond), WithOnOffline(offlineNotify), ) require.NoError(t, err) @@ -623,7 +608,6 @@ func TestConnectivityChecker_Options(t *testing.T) { checker, err := New(checkFunc, WithOnlineCheckInterval(10*time.Millisecond), // Very short interval WithOfflineDelay(0), - WithOfflineCheckInterval(10*time.Millisecond), WithOnOffline(offlineNotify), ) require.NoError(t, err) diff --git a/provider/internal/connectivity/options.go b/provider/internal/connectivity/options.go index c3a3aa914..a2e5aa5ee 100644 --- a/provider/internal/connectivity/options.go +++ b/provider/internal/connectivity/options.go @@ -8,9 +8,8 @@ import ( ) type config struct { - clock clock.Clock - onlineCheckInterval time.Duration // minimum check interval when online - offlineCheckInterval time.Duration // periodic check frequency when offline + clock clock.Clock + onlineCheckInterval time.Duration // minimum check interval when online offlineDelay time.Duration @@ -32,7 +31,6 @@ type Option func(opt *config) error var DefaultConfig = func(cfg *config) error { cfg.clock = clock.New() cfg.onlineCheckInterval = 1 * time.Minute - cfg.offlineCheckInterval = 5 * time.Minute return nil } @@ -57,18 +55,6 @@ func WithOnlineCheckInterval(d time.Duration) Option { } } -// WithOfflineCheckInterval sets the interval between connectivity checks when -// the node is offline. -func WithOfflineCheckInterval(d time.Duration) Option { - return func(cfg *config) error { - if d <= 0 { - return fmt.Errorf("offline check interval must be positive, got %s", d) - } - cfg.offlineCheckInterval = d - return nil - } -} - func WithOfflineDelay(d time.Duration) Option { return func(cfg *config) error { if d < 0 { diff --git a/provider/options.go b/provider/options.go index b9fa5d652..a36d63485 100644 --- a/provider/options.go +++ b/provider/options.go @@ -21,14 +21,14 @@ const ( // since regions can grow and shrink depending on the network churn. DefaultMaxReprovideDelay = 1 * time.Hour + // DefaultOfflineDelay is the default delay after which a disconnected node + // is considered as Offline. + DefaultOfflineDelay = 2 * time.Hour // DefaultConnectivityCheckOnlineInterval is the default minimum interval for // checking whether the node is still online. Such a check is performed when // a network operation fails, and the ConnectivityCheckOnlineInterval limits // how often such a check is performed. DefaultConnectivityCheckOnlineInterval = 1 * time.Minute - // DefaultConnectivityCheckOfflineInterval is the default interval for - // checking if the offline node has come online again. - DefaultConnectivityCheckOfflineInterval = 5 * time.Minute ) type config struct { @@ -36,9 +36,8 @@ type config struct { reprovideInterval time.Duration maxReprovideDelay time.Duration - offlineDelay time.Duration - connectivityCheckOnlineInterval time.Duration - connectivityCheckOfflineInterval time.Duration + offlineDelay time.Duration + connectivityCheckOnlineInterval time.Duration peerid peer.ID router KadClosestPeersRouter @@ -91,8 +90,8 @@ var DefaultConfig = func(cfg *config) error { cfg.replicationFactor = amino.DefaultBucketSize cfg.reprovideInterval = amino.DefaultReprovideInterval cfg.maxReprovideDelay = DefaultMaxReprovideDelay + cfg.offlineDelay = DefaultOfflineDelay cfg.connectivityCheckOnlineInterval = DefaultConnectivityCheckOnlineInterval - cfg.connectivityCheckOfflineInterval = DefaultConnectivityCheckOfflineInterval cfg.clock = clock.New() @@ -171,15 +170,6 @@ func WithConnectivityCheckOnlineInterval(d time.Duration) Option { } } -// WithConnectivityCheckOfflineInterval sets the interval for periodically -// checking whether the offline node has come online again. -func WithConnectivityCheckOfflineInterval(d time.Duration) Option { - return func(cfg *config) error { - cfg.connectivityCheckOfflineInterval = d - return nil - } -} - // WithPeerID sets the peer ID of the node running the provider. func WithPeerID(p peer.ID) Option { return func(cfg *config) error { diff --git a/provider/provider.go b/provider/provider.go index 73a022f43..3ba2902e9 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -199,7 +199,6 @@ func New(opts ...Option) (*SweepingProvider, error) { connectivity.WithClock(cfg.clock), connectivity.WithOfflineDelay(cfg.offlineDelay), connectivity.WithOnlineCheckInterval(cfg.connectivityCheckOnlineInterval), - connectivity.WithOfflineCheckInterval(cfg.connectivityCheckOfflineInterval), ) if err != nil { cancelCtx() @@ -897,6 +896,9 @@ func (s *SweepingProvider) handleProvide(force, reprovide bool, keys ...mh.Multi } } + if s.isOffline() { + return ErrOffline + } prefixes, err := s.groupAndScheduleKeysByPrefix(keys, reprovide) if err != nil { return err @@ -971,6 +973,12 @@ func (s *SweepingProvider) groupAndScheduleKeysByPrefix(keys []mh.Multihash, sch return prefixes, nil } +func (s *SweepingProvider) isOffline() bool { + s.avgPrefixLenLk.Lock() + defer s.avgPrefixLenLk.Unlock() + return s.cachedAvgPrefixLen == -1 +} + func (s *SweepingProvider) onOffline() { s.provideQueue.Clear() @@ -980,7 +988,7 @@ func (s *SweepingProvider) onOffline() { } func (s *SweepingProvider) onOnline() { - if !s.approxPrefixLenRunning.TryLock() { + if s.closed() || !s.approxPrefixLenRunning.TryLock() { return } s.wg.Add(1) @@ -1001,7 +1009,7 @@ func (s *SweepingProvider) onOnline() { // This function is guarded by s.lateReprovideRunning, ensuring the function // cannot be called again while it is working on reproviding late regions. func (s *SweepingProvider) catchupPendingWork() { - if !s.lateReprovideRunning.TryLock() { + if s.closed() || !s.lateReprovideRunning.TryLock() { return } s.wg.Add(2) @@ -1358,6 +1366,12 @@ func (s *SweepingProvider) releaseRegionReprovide(prefix bitstr.Key) { // ProvideOnce only sends provider records for the given keys out to the DHT // swarm. It does NOT take the responsibility to reprovide these keys. +// +// Returns an error if the keys couldn't be added to the provide queue. This +// can happen if the provider is closed or if the node is currently Offline +// (either never bootstrapped, or disconnected since more than `OfflineDelay`). +// The schedule and provide queue depend on the network size, hence recent +// network connectivity is essential. func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) error { if s.closed() { return ErrClosed @@ -1369,6 +1383,12 @@ func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) error { // already provided in the past. The keys will be periodically reprovided until // StopProviding is called for the same keys or user defined garbage collection // deletes the keys. +// +// Returns an error if the keys couldn't be added to the provide queue. This +// can happen if the provider is closed or if the node is currently Offline +// (either never bootstrapped, or disconnected since more than `OfflineDelay`). +// The schedule and provide queue depend on the network size, hence recent +// network connectivity is essential. func (s *SweepingProvider) StartProviding(force bool, keys ...mh.Multihash) error { if s.closed() { return ErrClosed @@ -1424,10 +1444,18 @@ func (s *SweepingProvider) ProvideStatus(key mh.Multihash) (state ProvideState, // AddToSchedule makes sure the prefixes associated with the supplied keys are // scheduled to be reprovided. +// +// Returns an error if the provider is closed or if the node is currently +// Offline (either never bootstrapped, or disconnected since more than +// `OfflineDelay`). The schedule depends on the network size, hence recent +// network connectivity is essential. func (s *SweepingProvider) AddToSchedule(keys ...mh.Multihash) error { if s.closed() { return ErrClosed } + if s.isOffline() { + return ErrOffline + } _, err := s.groupAndScheduleKeysByPrefix(keys, true) return err } @@ -1439,6 +1467,11 @@ func (s *SweepingProvider) AddToSchedule(keys ...mh.Multihash) error { // This function doesn't remove prefixes that have no keys from the schedule. // This is done automatically during the reprovide operation if a region has no // keys. +// +// Returns an error if the provider is closed or if the node is currently +// Offline (either never bootstrapped, or disconnected since more than +// `OfflineDelay`). The schedule depends on the network size, hence recent +// network connectivity is essential. func (s *SweepingProvider) RefreshSchedule() error { if s.closed() { return ErrClosed diff --git a/provider/provider_test.go b/provider/provider_test.go index f2bdf24f3..887eccf1d 100644 --- a/provider/provider_test.go +++ b/provider/provider_test.go @@ -604,7 +604,6 @@ func TestHandleReprovide(t *testing.T) { func() bool { return online.Load() }, connectivity.WithClock(mockClock), connectivity.WithOfflineDelay(offlineDelay), - connectivity.WithOfflineCheckInterval(connectivityCheckInterval), connectivity.WithOnlineCheckInterval(connectivityCheckInterval), ) require.NoError(t, err) @@ -800,7 +799,6 @@ func TestProvideOnce(t *testing.T) { WithClock(clk), WithOfflineDelay(offlineDelay), WithConnectivityCheckOnlineInterval(checkInterval), - WithConnectivityCheckOfflineInterval(checkInterval), } prov, err := New(opts...) require.NoError(t, err) @@ -819,7 +817,7 @@ func TestProvideOnce(t *testing.T) { clk.Add(checkInterval) // trigger connectivity check waitUntil(t, func() bool { return prov.connectivity.IsOnline() }, 100*time.Millisecond, "connectivity should be online") - // Set the reprovider as offline + // Set the provider as offline online.Store(false) prov.connectivity.TriggerCheck() time.Sleep(5 * time.Millisecond) // wait for connectivity check to finish @@ -833,7 +831,7 @@ func TestProvideOnce(t *testing.T) { require.Equal(t, 1, len(keys)) require.Equal(t, h, keys[0]) - // Set the reprovider as online + // Set the provider as online online.Store(true) clk.Add(checkInterval) // trigger connectivity check time.Sleep(5 * time.Millisecond) // wait for connectivity check to finish @@ -891,7 +889,6 @@ func TestStartProvidingSingle(t *testing.T) { WithClock(mockClock), WithOfflineDelay(offlineDelay), WithConnectivityCheckOnlineInterval(checkInterval), - WithConnectivityCheckOfflineInterval(checkInterval), } prov, err := New(opts...) require.NoError(t, err) @@ -1014,11 +1011,12 @@ func TestStartProvidingMany(t *testing.T) { }), WithClock(mockClock), } - reprovider, err := New(opts...) + prov, err := New(opts...) require.NoError(t, err) - defer reprovider.Close() + defer prov.Close() + time.Sleep(5 * time.Millisecond) // give some time for connectivity to come Online - err = reprovider.StartProviding(true, mhs...) + err = prov.StartProviding(true, mhs...) require.NoError(t, err) waitUntil(t, func() bool { return provideCount.Load() == int32(len(mhs)*replicationFactor) }, 100*time.Millisecond, "waiting for ProvideMany to finish") @@ -1162,7 +1160,6 @@ func TestStartProvidingUnstableNetwork(t *testing.T) { WithClock(mockClock), WithOfflineDelay(offlineDelay), WithConnectivityCheckOnlineInterval(connectivityCheckInterval), - WithConnectivityCheckOfflineInterval(connectivityCheckInterval), } prov, err := New(opts...) require.NoError(t, err) @@ -1309,3 +1306,123 @@ func TestRefreshSchedule(t *testing.T) { require.True(t, ok) } } + +func TestOperationsOffline(t *testing.T) { + pid, err := peer.Decode("12BoooooPEER") + require.NoError(t, err) + + mockClock := clock.NewMock() + checkInterval := time.Second + offlineDelay := time.Minute + + online := atomic.Bool{} // false, start offline + + router := &mockRouter{ + getClosestPeersFunc: func(ctx context.Context, k string) ([]peer.ID, error) { + if online.Load() { + return []peer.ID{pid}, nil + } + return nil, errors.New("offline") + }, + } + opts := []Option{ + WithReprovideInterval(time.Hour), + WithReplicationFactor(1), + WithMaxWorkers(1), + WithDedicatedBurstWorkers(0), + WithDedicatedPeriodicWorkers(0), + WithPeerID(pid), + WithRouter(router), + WithMessageSender(&mockMsgSender{}), + WithSelfAddrs(func() []ma.Multiaddr { + addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") + require.NoError(t, err) + return []ma.Multiaddr{addr} + }), + WithClock(mockClock), + WithOfflineDelay(offlineDelay), + WithConnectivityCheckOnlineInterval(checkInterval), + } + prov, err := New(opts...) + require.NoError(t, err) + defer prov.Close() + + k := random.Multihashes(1)[0] + + // Not bootstrapped yet, OFFLINE + err = prov.ProvideOnce(k) + require.ErrorIs(t, err, ErrOffline) + err = prov.StartProviding(false, k) + require.ErrorIs(t, err, ErrOffline) + err = prov.StartProviding(true, k) + require.ErrorIs(t, err, ErrOffline) + err = prov.RefreshSchedule() + require.ErrorIs(t, err, ErrOffline) + err = prov.AddToSchedule(k) + require.ErrorIs(t, err, ErrOffline) + err = prov.StopProviding(k) // no error for StopProviding + require.NoError(t, err) + + online.Store(true) + mockClock.Add(checkInterval) // trigger connectivity check + waitUntil(t, prov.connectivity.IsOnline, 100*time.Millisecond, "connectivity should be online") + + // ONLINE, operations shouldn't error + err = prov.ProvideOnce(k) + require.NoError(t, err) + err = prov.StartProviding(false, k) + require.NoError(t, err) + err = prov.StartProviding(true, k) + require.NoError(t, err) + err = prov.RefreshSchedule() + require.NoError(t, err) + err = prov.AddToSchedule(k) + require.NoError(t, err) + err = prov.StopProviding(k) // no error for StopProviding + require.NoError(t, err) + + online.Store(false) + mockClock.Add(checkInterval) // trigger connectivity check + prov.connectivity.TriggerCheck() + waitUntil(t, func() bool { return !prov.connectivity.IsOnline() }, 100*time.Millisecond, "connectivity should be offline") + + // DISCONNECTED, operations shoudln't error until node is OFFLINE + err = prov.ProvideOnce(k) + require.NoError(t, err) + err = prov.StartProviding(false, k) + require.NoError(t, err) + err = prov.StartProviding(true, k) + require.NoError(t, err) + err = prov.RefreshSchedule() + require.NoError(t, err) + err = prov.AddToSchedule(k) + require.NoError(t, err) + err = prov.StopProviding(k) // no error for StopProviding + require.NoError(t, err) + + prov.provideQueue.Enqueue("0000", k) + require.Equal(t, 1, prov.provideQueue.Size()) + mockClock.Add(offlineDelay) + time.Sleep(5 * time.Millisecond) // wait for connectivity check to finish + + // OFFLINE + // Verify that provide queue has been emptied by the onOffline callback + require.True(t, prov.provideQueue.IsEmpty()) + prov.avgPrefixLenLk.Lock() + require.Equal(t, -1, prov.cachedAvgPrefixLen) + prov.avgPrefixLenLk.Unlock() + + // All operations should error again + err = prov.ProvideOnce(k) + require.ErrorIs(t, err, ErrOffline) + err = prov.StartProviding(false, k) + require.ErrorIs(t, err, ErrOffline) + err = prov.StartProviding(true, k) + require.ErrorIs(t, err, ErrOffline) + err = prov.RefreshSchedule() + require.ErrorIs(t, err, ErrOffline) + err = prov.AddToSchedule(k) + require.ErrorIs(t, err, ErrOffline) + err = prov.StopProviding(k) // no error for StopProviding + require.NoError(t, err) +}