Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f8ecb6d
provider: stats
guillaumemichel Sep 5, 2025
34fcc12
use proper struct for stats
guillaumemichel Sep 9, 2025
405e2d9
provide queue stats
guillaumemichel Sep 9, 2025
63b65e7
counting ongoing operations
guillaumemichel Sep 9, 2025
fb00baf
provide counts
guillaumemichel Sep 9, 2025
cb8bca6
floatTimeSeries
guillaumemichel Sep 9, 2025
33d2051
draft: last metrics
guillaumemichel Sep 17, 2025
41c1c07
refactor
guillaumemichel Sep 17, 2025
b513e72
gofmt
guillaumemichel Sep 18, 2025
cfe33ff
tests
guillaumemichel Sep 19, 2025
56a8625
bump reservedpool
guillaumemichel Sep 19, 2025
06ec289
Merge branch 'master' into provider-stats
guillaumemichel Sep 25, 2025
3ec3cf5
Merge branch 'master' into provider-stats
guillaumemichel Sep 26, 2025
ada0917
Merge branch 'master' into provider-stats
guillaumemichel Oct 6, 2025
102a676
make provider inside buffered.SweepingProvider public
guillaumemichel Oct 6, 2025
21c2103
format
guillaumemichel Oct 7, 2025
cb675c4
fix: flaky TestStats
guillaumemichel Oct 7, 2025
45756c5
avg region size metric
guillaumemichel Oct 8, 2025
28b59f8
fix: ongoing provides counter
guillaumemichel Oct 8, 2025
2c5b376
float64 avgPrefixLen
guillaumemichel Oct 9, 2025
84edca4
provider: persist provide queue
guillaumemichel Oct 9, 2025
27946fa
persist & load provide queue to/from datastore
guillaumemichel Oct 9, 2025
5c38378
configure datastore
guillaumemichel Oct 9, 2025
d83a255
test: provide queue persistence after close
guillaumemichel Oct 9, 2025
f4a33fa
limit batch size
guillaumemichel Oct 15, 2025
3888925
addressing review
guillaumemichel Oct 17, 2025
be3bbde
Merge branch 'master' into provider-stats
guillaumemichel Oct 17, 2025
5a08cfc
test: fix edge case
guillaumemichel Oct 17, 2025
ff53652
Merge branch 'provider-stats' into persist-provide-queue
guillaumemichel Oct 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
counting ongoing operations
  • Loading branch information
guillaumemichel committed Sep 19, 2025
commit 63b65e7e45323dca820e41b0720bf2ea01ba1407
23 changes: 21 additions & 2 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ type SweepingProvider struct {
addLocalRecord func(mh.Multihash) error

provideCounter metric.Int64Counter
opStats operationStats
}

// New creates a new SweepingProvider instance with the supplied options.
Expand Down Expand Up @@ -1115,7 +1116,8 @@ func (s *SweepingProvider) reprovideLateRegions() {
}

func (s *SweepingProvider) batchProvide(prefix bitstr.Key, keys []mh.Multihash) {
if len(keys) == 0 {
keyCount := len(keys)
if keyCount == 0 {
return
}
addrInfo, ok := s.selfAddrInfo()
Expand All @@ -1124,6 +1126,12 @@ func (s *SweepingProvider) batchProvide(prefix bitstr.Key, keys []mh.Multihash)
// provider record.
return
}

s.opStats.ongoingProvides.start(keyCount)
defer func() {
s.opStats.ongoingProvides.finish(keyCount)
}()

if len(keys) <= individualProvideThreshold {
// Don't fully explore the region, execute simple DHT provides for these
// keys. It isn't worth it to fully explore a region for just a few keys.
Expand All @@ -1142,6 +1150,8 @@ func (s *SweepingProvider) batchProvide(prefix bitstr.Key, keys []mh.Multihash)
// current provide batch.
extraKeys := s.provideQueue.DequeueMatching(coveredPrefix)
keys = append(keys, extraKeys...)
keyCount += len(extraKeys)
s.opStats.ongoingProvides.addKeys(len(extraKeys))
regions = keyspace.AssignKeysToRegions(regions, keys)

if !s.provideRegions(regions, addrInfo, false, false) {
Expand All @@ -1166,10 +1176,17 @@ func (s *SweepingProvider) batchReprovide(prefix bitstr.Key, periodicReprovide b
}
return
}
if len(keys) == 0 {
keyCount := len(keys)
if keyCount == 0 {
logger.Infof("No keys to reprovide for prefix %s", prefix)
return
}

s.opStats.ongoingProvides.start(keyCount)
defer func() {
s.opStats.ongoingProvides.finish(keyCount)
}()

if len(keys) <= individualProvideThreshold {
// Don't fully explore the region, execute simple DHT provides for these
// keys. It isn't worth it to fully explore a region for just a few keys.
Expand Down Expand Up @@ -1213,6 +1230,8 @@ func (s *SweepingProvider) batchReprovide(prefix bitstr.Key, periodicReprovide b
s.reschedulePrefix(prefix)
}
}
s.opStats.ongoingProvides.addKeys(len(keys) - keyCount)
keyCount = len(keys)
}
regions = keyspace.AssignKeysToRegions(regions, keys)

Expand Down
32 changes: 29 additions & 3 deletions provider/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package provider

import (
"context"
"sync/atomic"
"time"

"github.com/libp2p/go-libp2p-kad-dht/provider/stats"
Expand Down Expand Up @@ -89,9 +90,10 @@ func (s *SweepingProvider) Stats() stats.Stats {
}

ongoingOps := stats.OngoingOperations{
RegionProvides: 0, // TODO:
KeyProvides: 0, // TODO:
RegionReprovides: 0, // TODO:
RegionProvides: int(s.opStats.ongoingProvides.opCount.Load()),
KeyProvides: int(s.opStats.ongoingProvides.keyCount.Load()),
RegionReprovides: int(s.opStats.ongoingReprovides.opCount.Load()),
KeyReprovides: int(s.opStats.ongoingReprovides.keyCount.Load()),
}

pastOps := stats.PastOperations{
Expand All @@ -118,3 +120,27 @@ func (s *SweepingProvider) Stats() stats.Stats {

return snapshot
}

type operationStats struct {
ongoingProvides ongoingOpStats
ongoingReprovides ongoingOpStats
}

type ongoingOpStats struct {
opCount atomic.Int32
keyCount atomic.Int32
}

func (s *ongoingOpStats) start(keyCount int) {
s.opCount.Add(1)
s.keyCount.Add(int32(keyCount))
}

func (s *ongoingOpStats) addKeys(keyCount int) {
s.keyCount.Add(int32(keyCount))
}

func (s *ongoingOpStats) finish(keyCount int) {
s.opCount.Add(-1)
s.keyCount.Add(-int32(keyCount))
}
1 change: 1 addition & 0 deletions provider/stats/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type OngoingOperations struct {
RegionProvides int `json:"region_provides"`
KeyProvides int `json:"key_provides"`
RegionReprovides int `json:"region_reprovides"`
KeyReprovides int `json:"key_reprovides"`
}

type PastOperations struct {
Expand Down