Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8b2979b
provider: adding provide and reprovide queue
guillaumemichel Jul 24, 2025
4049cfd
Merge branch 'provider' into provider-queues
guillaumemichel Jul 24, 2025
fac928e
provider: network operations
guillaumemichel Jul 24, 2025
87c3d14
add some tests
guillaumemichel Jul 24, 2025
7b681e5
schedule prefix len computations
guillaumemichel Jul 24, 2025
a191032
provider schedule
guillaumemichel Jul 24, 2025
4081f02
addressed review
guillaumemichel Aug 5, 2025
83e6f59
Merge branch 'provider' into provider-queues
guillaumemichel Aug 5, 2025
db73ea4
use go-test/random
guillaumemichel Aug 5, 2025
e0ece0a
satisfy linter
guillaumemichel Aug 5, 2025
039e0a4
Merge branch 'provider-queues' into provider-network-ops
guillaumemichel Aug 5, 2025
47f82bf
Merge branch 'provider' into provider-network-ops
guillaumemichel Aug 5, 2025
c707b63
Merge branch 'provider-network-ops' into provider-prefixlen
guillaumemichel Aug 5, 2025
fd95218
log errors during initial prefix len measurement
guillaumemichel Aug 5, 2025
b117913
Merge branch 'provider' into provider-prefixlen
guillaumemichel Aug 5, 2025
c8c4bc3
Merge branch 'provider-prefixlen' into provider-schedule
guillaumemichel Aug 5, 2025
fbe5f8a
address review
guillaumemichel Aug 5, 2025
e1ddf61
satisfy linter
guillaumemichel Aug 5, 2025
b64abe5
simplify unscheduleSubsumedPrefixesNoClock
guillaumemichel Aug 8, 2025
486a39c
Merge branch 'provider' into provider-prefixlen
guillaumemichel Aug 8, 2025
59c9e9a
Merge branch 'provider-prefixlen' into provider-schedule
guillaumemichel Aug 8, 2025
1c13df6
Merge branch 'provider' into provider-schedule
guillaumemichel Aug 13, 2025
0e05081
moved maxPrefixSize const to top
guillaumemichel Aug 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
provider: adding provide and reprovide queue
  • Loading branch information
guillaumemichel committed Jul 24, 2025
commit 8b2979b59d9bb7949c91e84e1226cb083cebfc62
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.23.8

require (
github.com/filecoin-project/go-clock v0.1.0
github.com/gammazero/deque v1.0.0
github.com/google/gopacket v1.1.19
github.com/google/uuid v1.6.0
github.com/hashicorp/golang-lru v1.0.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiD
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
Expand Down
116 changes: 116 additions & 0 deletions provider/internal/queue/prefix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package queue

import (
"slices"

"github.com/gammazero/deque"
"github.com/libp2p/go-libp2p-kad-dht/provider/internal/keyspace"
"github.com/probe-lab/go-libdht/kad/key/bit256"
"github.com/probe-lab/go-libdht/kad/key/bitstr"
"github.com/probe-lab/go-libdht/kad/trie"
)

// prefixQueue is a non-thread safe queue storing non overlapping, unique
// prefixes of kademlia keys, in the order they were enqueued.
type prefixQueue struct {
queue deque.Deque[bitstr.Key] // used to preserve the queue order
prefixes *trie.Trie[bitstr.Key, struct{}] // used to track prefixes in the queue
}

// Push adds a prefix to the queue.
//
// If prefix is already in the queue, this is a no-op.
//
// If the queue contains superstrings of the supplied prefix, insert the
// supplied prefix at the position of the first superstring in the queue, and
// remove all superstrings from the queue. The prefixes are consolidated around
// the shortest prefix.
func (q *prefixQueue) Push(prefix bitstr.Key) {
if subtrie, ok := keyspace.FindSubtrie(q.prefixes, prefix); ok {
// Prefix is a prefix of (at least) an existing prefix in the queue.
entriesToRemove := keyspace.AllEntries(subtrie, bit256.ZeroKey())
prefixesToRemove := make([]bitstr.Key, len(entriesToRemove))
for i, entry := range entriesToRemove {
prefixesToRemove[i] = entry.Key
}
// Remove superstrings of `prefix` from the queue
firstRemovedIndex := q.removePrefixesFromQueue(prefixesToRemove)
// Insert `prefix` in the queue at the location of the first removed
// prefix (last in order of deletion).
q.queue.Insert(firstRemovedIndex, prefix)
// Add `prefix` to prefixes trie.
q.prefixes.Add(prefix, struct{}{})
} else if _, ok := keyspace.FindPrefixOfKey(q.prefixes, prefix); !ok {
// No prefixes of `prefix` found in the queue.
q.queue.PushBack(prefix)
q.prefixes.Add(prefix, struct{}{})
}
}

// Pop removes and returns the first prefix from the queue.
func (q *prefixQueue) Pop() (bitstr.Key, bool) {
if q.queue.Len() == 0 {
return bitstr.Key(""), false
}
// Dequeue the first prefix from the queue.
prefix := q.queue.PopFront()
// Remove the prefix from the prefixes trie.
q.prefixes.Remove(prefix)

return prefix, true
}

// Remove removes a prefix or all its superstrings from the queue, if any.
func (q *prefixQueue) Remove(prefix bitstr.Key) bool {
subtrie, ok := keyspace.FindSubtrie(q.prefixes, prefix)
if !ok {
return false
}
entriesToRemove := keyspace.AllEntries(subtrie, bit256.ZeroKey())
prefixesToRemove := make([]bitstr.Key, len(entriesToRemove))
for i, entry := range entriesToRemove {
prefixesToRemove[i] = entry.Key
}
q.removePrefixesFromQueue(prefixesToRemove)
return true
}

// Returns the number of prefixes in the queue.
func (q *prefixQueue) Size() int {
return q.queue.Len()
}

// Clear removes all keys from the queue and returns the number of keys that
// were removed.
func (q *prefixQueue) Clear() int {
size := q.Size()

q.queue.Clear()
*q.prefixes = trie.Trie[bitstr.Key, struct{}]{}

return size
}

// removeSubtrieFromQueue removes all keys in the provided subtrie from q.queue
// and q.prefixes. Returns the position of the first removed key in the queue.
func (q *prefixQueue) removePrefixesFromQueue(prefixes []bitstr.Key) int {
indexes := make([]int, 0, len(prefixes))
for _, prefix := range prefixes {
// Remove elements from the queue that are superstrings of `prefix`.
q.prefixes.Remove(prefix)
// Find indexes of the superstrings in the queue.
index := q.queue.Index(func(element bitstr.Key) bool { return element == prefix })
if index >= 0 {
indexes = append(indexes, index)
}
}
// Sort indexes to remove in descending order so that we can remove them
// without affecting the indexes of the remaining elements.
slices.Sort(indexes)
slices.Reverse(indexes)
// Remove items in the queue that are prefixes of `prefix`
for _, index := range indexes {
q.queue.Remove(index)
}
return indexes[len(indexes)-1] // return the position of the first removed key
}
191 changes: 191 additions & 0 deletions provider/internal/queue/provide.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package queue

import (
"sync"

mh "github.com/multiformats/go-multihash"

"github.com/probe-lab/go-libdht/kad/key/bit256"
"github.com/probe-lab/go-libdht/kad/key/bitstr"
"github.com/probe-lab/go-libdht/kad/trie"

"github.com/libp2p/go-libp2p-kad-dht/provider/internal/keyspace"
)

// ProvideQueue is a thread-safe queue storing multihashes about to be provided
// to a Kademlia DHT, allowing smart batching.
//
// The queue groups keys by their kademlia identifier prefixes, so that keys
// that should be allocated to the same DHT peers are dequeued together from
// the queue, for efficient batch providing.
//
// The insertion order of prefixes is preserved, but not for keys. Inserting
// keys matching a prefix that is already in the queue inserts the keys at the
// position of the existing prefix.
//
// ProvideQueue allows dequeuing the first prefix of the queue, with all
// matching keys or dequeuing all keys matching a requested prefix.
type ProvideQueue struct {
mu sync.Mutex

queue prefixQueue
keys *trie.Trie[bit256.Key, mh.Multihash] // used to store keys in the queue
}

// NewProvideQueue creates a new ProvideQueue instance.
func NewProvideQueue() *ProvideQueue {
return &ProvideQueue{
queue: prefixQueue{prefixes: trie.New[bitstr.Key, struct{}]()},
keys: trie.New[bit256.Key, mh.Multihash](),
}
}

// Enqueue adds the supplied keys to the queue under the given prefix.
//
// If the prefix already sits in the queue, supplied keys join the queue at the
// position of the existing prefix. If the queue contains prefixes that are
// superstrings of the supplied prefix, all keys matching the supplied prefix
// are consolidated at the position of the first matching superstring in the
// queue.
//
// If supplied prefix doesn't exist yet in the queue, add it at the end.
//
// Supplied keys MUST match the supplied prefix.
func (q *ProvideQueue) Enqueue(prefix bitstr.Key, keys ...mh.Multihash) {
if len(keys) == 0 {
return
}
q.mu.Lock()
defer q.mu.Unlock()

// Enqueue the prefix in the queue if required.
q.queue.Push(prefix)

// Add keys to the keys trie.
for _, h := range keys {
q.keys.Add(keyspace.MhToBit256(h), h)
}
}

// Dequeue pops the first prefix of the queue along with all matching keys.
//
// The prefix and keys are removed from the queue. If the queue is empty,
// return false and the empty prefix.
func (q *ProvideQueue) Dequeue() (bitstr.Key, []mh.Multihash, bool) {
q.mu.Lock()
defer q.mu.Unlock()
prefix, ok := q.queue.Pop()
if !ok {
return prefix, nil, false
}

// Get all keys that match the prefix.
subtrie, _ := keyspace.FindSubtrie(q.keys, prefix)
keys := keyspace.AllValues(subtrie, bit256.ZeroKey())

// Remove the keys from the keys trie.
keyspace.PruneSubtrie(q.keys, prefix)

return prefix, keys, true
}

// DequeueMatching returns keys matching the given prefix from the queue.
//
// The keys and prefix are removed from the queue. If the queue is empty, or
// supplied prefix doesn't match any keys, an empty slice is returned.
func (q *ProvideQueue) DequeueMatching(prefix bitstr.Key) []mh.Multihash {
q.mu.Lock()
defer q.mu.Unlock()

subtrie, ok := keyspace.FindSubtrie(q.keys, prefix)
if !ok {
// No keys matching the prefix.
return nil
}
keys := keyspace.AllValues(subtrie, bit256.ZeroKey())

// Remove the keys from the keys trie.
keyspace.PruneSubtrie(q.keys, prefix)

// Remove prefix and its superstrings from queue if any.
removed := q.queue.Remove(prefix)
if !removed {
// prefix and superstrings not in queue.
if shorterPrefix, ok := keyspace.FindPrefixOfKey(q.queue.prefixes, prefix); ok {
// prefix is a superstring of some other shorter prefix in the queue.
// Leave it in the queue, unless the shorter prefix doesn't have any
// matching keys left.
if _, ok := keyspace.FindSubtrie(q.keys, shorterPrefix); !ok {
// No keys matching shorterPrefix, remove shorterPrefix from queue.
q.queue.Remove(shorterPrefix)
}
}
}
return keys
}

// Remove removes the supplied keys from the queue.
//
// If this operation removes the last keys for prefixes in the queue, remove
// the prefixes from the queue.
func (q *ProvideQueue) Remove(keys ...mh.Multihash) {
q.mu.Lock()
defer q.mu.Unlock()

matchingPrefixes := make(map[bitstr.Key]struct{})

// Remove keys from the keys trie.
for _, h := range keys {
k := keyspace.MhToBit256(h)
q.keys.Remove(k)
if prefix, ok := keyspace.FindPrefixOfKey(q.queue.prefixes, k); ok {
// Get the trie leaf matching the key, if any.
matchingPrefixes[prefix] = struct{}{}
}
}

// For matching prefixes, if no more keys are matching, remove them from
// queue.
prefixesToRemove := make([]bitstr.Key, 0)
for prefix := range matchingPrefixes {
if _, ok := keyspace.FindSubtrie(q.keys, prefix); !ok {
prefixesToRemove = append(prefixesToRemove, prefix)
}
}
if len(prefixesToRemove) > 0 {
q.queue.removePrefixesFromQueue(prefixesToRemove)
}
}

// IsEmpty returns true if the queue is empty.
func (q *ProvideQueue) IsEmpty() bool {
q.mu.Lock()
defer q.mu.Unlock()
return q.queue.Size() == 0
}

// Size returns the number of regions containing at least one key in the queue.
func (q *ProvideQueue) Size() int {
q.mu.Lock()
defer q.mu.Unlock()
return q.sizeNoLock()
}

// sizeNoLock returns the number of regions containing at least one key in the
// queue. It assumes the mutex is held already.
func (q *ProvideQueue) sizeNoLock() int {
return q.keys.Size()
}

// Clear removes all keys from the queue and returns the number of keys that
// were removed.
func (q *ProvideQueue) Clear() int {
q.mu.Lock()
defer q.mu.Unlock()
size := q.sizeNoLock()

q.queue.Clear()
*q.keys = trie.Trie[bit256.Key, mh.Multihash]{}

return size
}
Loading
Loading