Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: flaky TestStats
  • Loading branch information
guillaumemichel committed Oct 7, 2025
commit cb675c4ec89d3273e385bc833fdaef6ce68cf345
48 changes: 34 additions & 14 deletions provider/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,28 @@ func TestStats(t *testing.T) {
return sortedPeers[:min(replicationFactor, len(peers))], nil
},
}
maxWorkers := 12
dedicatedBurstWorkers := 6
dedicatedPeriodicWorkers := 2
maxConnsPerWorker := 3

blocked := false
workersBlocked := make(chan struct{}, maxWorkers*maxConnsPerWorker)
blockedCond := sync.NewCond(&sync.Mutex{})
msgSender := &mockMsgSender{
sendMessageFunc: func(ctx context.Context, p peer.ID, m *pb.Message) error {
blockedCond.L.Lock()
for blocked {
blockedCond.Wait()
if blocked {
// Signal once that this worker is blocked
select {
case workersBlocked <- struct{}{}:
default:
t.Fatal("too many workers blocked")
}
// Wait until unblocked
for blocked {
blockedCond.Wait()
}
}
blockedCond.L.Unlock()
if reachable := peerReachability[p]; !reachable {
Expand All @@ -75,12 +90,7 @@ func TestStats(t *testing.T) {
},
}

maxWorkers := 12
dedicatedBurstWorkers := 6
dedicatedPeriodicWorkers := 2
maxConnsPerWorker := 3
offlineDelay := 10 * time.Minute

startTime := time.Now()

opts := []Option{
Expand Down Expand Up @@ -171,7 +181,11 @@ func TestStats(t *testing.T) {
require.NoError(t, err)
})

synctest.Wait()
// Wait for provide operation to be blocked (single worker)
for range maxConnsPerWorker {
<-workersBlocked
}
require.Empty(t, workersBlocked)

// Check stats during a provide operation (blocked)
stats = prov.Stats()
Expand Down Expand Up @@ -307,8 +321,11 @@ func TestStats(t *testing.T) {
// Sleep until the keys must be reprovided
time.Sleep(time.Until(startTime.Add(reprovideInterval)))

// Run reprovide until it blocks
synctest.Wait()
// Wait for reprovide to be blocked (single worker)
for range maxConnsPerWorker {
<-workersBlocked
}
require.Empty(t, workersBlocked)

stats = prov.Stats()

Expand Down Expand Up @@ -455,11 +472,15 @@ func TestStats(t *testing.T) {
require.NoError(t, err)
})

synctest.Wait()
concurrentProvides := maxWorkers - dedicatedPeriodicWorkers // all workers but the dedicated periodic workers

stats = prov.Stats()
// Wait for the expected number of workers to become blocked
for range concurrentProvides * maxConnsPerWorker {
<-workersBlocked
}
require.Empty(t, workersBlocked)

concurrentProvides := maxWorkers - dedicatedPeriodicWorkers // all workers but the dedicated periodic workers
stats = prov.Stats()

require.False(t, stats.Closed)
require.Equal(t, "online", stats.Connectivity.Status)
Expand Down Expand Up @@ -570,7 +591,6 @@ func TestStats(t *testing.T) {

// Wait a full reprovide cycle for all reprovide to happen
time.Sleep(reprovideInterval)
synctest.Wait()

// Check stats after all regions have been reprovided
stats = prov.Stats()
Expand Down