Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
8b2979b
provider: adding provide and reprovide queue
guillaumemichel Jul 24, 2025
4049cfd
Merge branch 'provider' into provider-queues
guillaumemichel Jul 24, 2025
fac928e
provider: network operations
guillaumemichel Jul 24, 2025
87c3d14
add some tests
guillaumemichel Jul 24, 2025
7b681e5
schedule prefix len computations
guillaumemichel Jul 24, 2025
a191032
provider schedule
guillaumemichel Jul 24, 2025
41fcb41
provider: handleProvide
guillaumemichel Jul 24, 2025
4081f02
addressed review
guillaumemichel Aug 5, 2025
83e6f59
Merge branch 'provider' into provider-queues
guillaumemichel Aug 5, 2025
db73ea4
use go-test/random
guillaumemichel Aug 5, 2025
e0ece0a
satisfy linter
guillaumemichel Aug 5, 2025
039e0a4
Merge branch 'provider-queues' into provider-network-ops
guillaumemichel Aug 5, 2025
47f82bf
Merge branch 'provider' into provider-network-ops
guillaumemichel Aug 5, 2025
c707b63
Merge branch 'provider-network-ops' into provider-prefixlen
guillaumemichel Aug 5, 2025
fd95218
log errors during initial prefix len measurement
guillaumemichel Aug 5, 2025
b117913
Merge branch 'provider' into provider-prefixlen
guillaumemichel Aug 5, 2025
c8c4bc3
Merge branch 'provider-prefixlen' into provider-schedule
guillaumemichel Aug 5, 2025
fbe5f8a
address review
guillaumemichel Aug 5, 2025
e1ddf61
satisfy linter
guillaumemichel Aug 5, 2025
634601e
Merge branch 'provider-schedule' into provider-handleProvide
guillaumemichel Aug 5, 2025
834802b
address review
guillaumemichel Aug 5, 2025
5fa93f1
provider: explore swarm
guillaumemichel Aug 6, 2025
2889495
provider: batch provide
guillaumemichel Aug 6, 2025
47f65cb
provider: batch reprovide
guillaumemichel Aug 6, 2025
facff54
provider: catchup pending work
guillaumemichel Aug 6, 2025
d1fc1c5
provider: options
guillaumemichel Aug 6, 2025
fc3ea16
provide: handle reprovide
guillaumemichel Aug 6, 2025
2727bba
provider: daemon
guillaumemichel Aug 6, 2025
67f7ffb
provider: integration tests
guillaumemichel Aug 6, 2025
3a97a4f
cancel context of external functions + tests
guillaumemichel Aug 7, 2025
b9f9004
fix panic when adding key to trie if superstring already exists
guillaumemichel Aug 7, 2025
17335b6
address review
guillaumemichel Aug 8, 2025
31a1797
decrease minimal region size from replicationFactor+1 to replicationF…
guillaumemichel Aug 8, 2025
b64abe5
simplify unscheduleSubsumedPrefixesNoClock
guillaumemichel Aug 8, 2025
486a39c
Merge branch 'provider' into provider-prefixlen
guillaumemichel Aug 8, 2025
59c9e9a
Merge branch 'provider-prefixlen' into provider-schedule
guillaumemichel Aug 8, 2025
48a0033
Merge branch 'provider-schedule' into provider-handleProvide
guillaumemichel Aug 8, 2025
425565a
address review
guillaumemichel Aug 8, 2025
6900ec9
Merge branch 'provider-handleProvide' into provider-exploreSwarm
guillaumemichel Aug 8, 2025
1299170
Merge branch 'provider-exploreSwarm' into provider-batchProvide
guillaumemichel Aug 8, 2025
4111c0c
Merge branch 'provider-batchProvide' into provider-batchReprovide
guillaumemichel Aug 8, 2025
d318fae
fix test to match region size (now: replicationFactor, before: replic…
guillaumemichel Aug 8, 2025
79e38d9
Merge branch 'provider-exploreSwarm' into provider-batchProvide
guillaumemichel Aug 8, 2025
bbfba3d
Merge branch 'provider-batchProvide' into provider-batchReprovide
guillaumemichel Aug 8, 2025
ca41e17
dequeue outside of go routine
guillaumemichel Aug 8, 2025
52712c5
Merge branch 'provider-batchReprovide' into provider-catchupPendingWork
guillaumemichel Aug 8, 2025
6df6495
Merge branch 'provider-catchupPendingWork' into provider-options
guillaumemichel Aug 8, 2025
489dee6
Merge branch 'provider-options' into provider-handleReprovide
guillaumemichel Aug 8, 2025
4df4b80
Merge branch 'provider-handleReprovide' into provider-run
guillaumemichel Aug 8, 2025
54802b2
Merge branch 'provider-run' into provider-integrationTests
guillaumemichel Aug 8, 2025
c9c7faa
fix tests
guillaumemichel Aug 8, 2025
8ee0ece
close connectivity
guillaumemichel Aug 13, 2025
8bc762d
fixing conflicts
guillaumemichel Aug 13, 2025
1940029
Merge branch 'provider' into provider-run
guillaumemichel Aug 13, 2025
e1af44b
Merge branch 'provider-run' into provider-integrationTests
guillaumemichel Aug 13, 2025
81bf0c2
Merge branch 'provider' into provider-integrationTests
guillaumemichel Aug 13, 2025
b0e0039
fix waitgroup
guillaumemichel Aug 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
provider: options
  • Loading branch information
guillaumemichel committed Aug 6, 2025
commit d1fc1c5b62442aaa915cd0f51a60bea78d59b50c
291 changes: 291 additions & 0 deletions provider/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
package provider

import (
"errors"
"fmt"
"time"

"github.com/filecoin-project/go-clock"
"github.com/libp2p/go-libp2p-kad-dht/amino"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kad-dht/provider/datastore"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
mh "github.com/multiformats/go-multihash"
)

const (
// DefaultMaxReprovideDelay is the default maximum delay allowed when
// reproviding a region. The interval between 2 reprovides of the same region
// is at most ReprovideInterval+MaxReprovideDelay. This variable is necessary
// since regions can grow and shrink depending on the network churn.
DefaultMaxReprovideDelay = 1 * time.Hour

// DefaultConnectivityCheckOnlineInterval is the default minimum interval for
// checking whether the node is still online. Such a check is performed when
// a network operation fails, and the ConnectivityCheckOnlineInterval limits
// how often such a check is performed.
DefaultConnectivityCheckOnlineInterval = 1 * time.Minute
// DefaultConnectivityCheckOfflineInterval is the default interval for
// checking if the offline node has come online again.
DefaultConnectivityCheckOfflineInterval = 5 * time.Minute
)

type config struct {
replicationFactor int
reprovideInterval time.Duration
maxReprovideDelay time.Duration
connectivityCheckOnlineInterval time.Duration
connectivityCheckOfflineInterval time.Duration

peerid peer.ID
router KadClosestPeersRouter

keyStore *datastore.KeyStore

msgSender pb.MessageSender
selfAddrs func() []ma.Multiaddr
addLocalRecord func(mh.Multihash) error

clock clock.Clock

maxWorkers int
dedicatedPeriodicWorkers int
dedicatedBurstWorkers int
maxProvideConnsPerWorker int
}

func (cfg *config) apply(opts ...Option) error {
for i, o := range opts {
if err := o(cfg); err != nil {
return fmt.Errorf("reprovider dht option %d failed: %w", i, err)
}
}
return nil
}

func (c *config) validate() error {
if len(c.peerid) == 0 {
return errors.New("reprovider config: peer id is required")
}
if c.router == nil {
return errors.New("reprovider config: router is required")
}
if c.msgSender == nil {
return errors.New("reprovider config: message sender is required")
}
if c.selfAddrs == nil {
return errors.New("reprovider config: self addrs func is required")
}
if c.dedicatedPeriodicWorkers+c.dedicatedBurstWorkers > c.maxWorkers {
return errors.New("reprovider config: total dedicated workers exceed max workers")
}
return nil
}

type Option func(opt *config) error

var DefaultConfig = func(cfg *config) error {
cfg.replicationFactor = amino.DefaultBucketSize
cfg.reprovideInterval = amino.DefaultReprovideInterval
cfg.maxReprovideDelay = DefaultMaxReprovideDelay
cfg.connectivityCheckOnlineInterval = DefaultConnectivityCheckOnlineInterval
cfg.connectivityCheckOfflineInterval = DefaultConnectivityCheckOfflineInterval

cfg.clock = clock.New()

cfg.maxWorkers = 4
cfg.dedicatedPeriodicWorkers = 2
cfg.dedicatedBurstWorkers = 1
cfg.maxProvideConnsPerWorker = 20

cfg.addLocalRecord = func(mh mh.Multihash) error { return nil }

return nil
}

// WithReplicationFactor sets the replication factor for provider records. It
// means that during provide and reprovide operations, each provider records is
// allocated to the ReplicationFactor closest peers in the DHT swarm.
func WithReplicationFactor(n int) Option {
return func(cfg *config) error {
if n <= 0 {
return errors.New("reprovider config: replication factor must be a positive integer")
}
cfg.replicationFactor = n
return nil
}
}

// WithReprovideInterval sets the interval at which regions are reprovided.
func WithReprovideInterval(d time.Duration) Option {
return func(cfg *config) error {
if d <= 0 {
return errors.New("reprovider config: reprovide interval must be greater than 0")
}
cfg.reprovideInterval = d
return nil
}
}

// WithMaxReprovideDelay sets the maximum delay allowed when reproviding a
// region. The interval between 2 reprovides of the same region is at most
// ReprovideInterval+MaxReprovideDelay.
//
// This parameter is necessary since regions can grow and shrink depending on
// the network churn.
func WithMaxReprovideDelay(d time.Duration) Option {
return func(cfg *config) error {
if d <= 0 {
return errors.New("reprovider config: max reprovide delay must be greater than 0")
}
cfg.maxReprovideDelay = d
return nil
}
}

// WithConnectivityCheckOnlineInterval sets the minimal interval for checking
// whether the node is still online. Such a check is performed when a network
// operation fails, and the ConnectivityCheckOnlineInterval limits how often
// such a check is performed.
func WithConnectivityCheckOnlineInterval(d time.Duration) Option {
return func(cfg *config) error {
cfg.connectivityCheckOnlineInterval = d
return nil
}
}

// WithConnectivityCheckOfflineInterval sets the interval for periodically
// checking whether the offline node has come online again.
func WithConnectivityCheckOfflineInterval(d time.Duration) Option {
return func(cfg *config) error {
cfg.connectivityCheckOfflineInterval = d
return nil
}
}

// WithPeerID sets the peer ID of the node running the provider.
func WithPeerID(p peer.ID) Option {
return func(cfg *config) error {
cfg.peerid = p
return nil
}
}

// WithRouter sets the router used to find closest peers in the DHT.
func WithRouter(r KadClosestPeersRouter) Option {
return func(cfg *config) error {
cfg.router = r
return nil
}
}

// WithMessageSender sets the message sender used to send messages out to the
// DHT swarm.
func WithMessageSender(m pb.MessageSender) Option {
return func(cfg *config) error {
cfg.msgSender = m
return nil
}
}

// WithSelfAddrs sets the function that returns the self addresses of the node.
// These addresses are written in the provider records advertised by the node.
func WithSelfAddrs(f func() []ma.Multiaddr) Option {
return func(cfg *config) error {
cfg.selfAddrs = f
return nil
}
}

// WithAddLocalRecord sets the function that adds a provider record to the
// local provider record store.
func WithAddLocalRecord(f func(mh.Multihash) error) Option {
return func(cfg *config) error {
if f == nil {
return errors.New("reprovider config: add local record function cannot be nil")
}
cfg.addLocalRecord = f
return nil
}
}

// WithClock sets the clock used by the provider. This is useful for testing
// purposes, allowing to control time in tests.
func WithClock(c clock.Clock) Option {
return func(cfg *config) error {
cfg.clock = c
return nil
}
}

// WithMaxWorkers sets the maximum number of workers that can be used for
// provide and reprovide jobs. The job of a worker is to explore a region of
// the keyspace and (re)provide the keys matching the region to the closest
// peers.
//
// You can configure a number of workers dedicated to periodic jobs, and a
// number of workers dedicated to burst jobs. MaxWorkers should be greater or
// equal to DedicatedPeriodicWorkers+DedicatedBurstWorkers. The additional
// workers that aren't dedicated to specific jobs can be used for either job
// type where needed.
func WithMaxWorkers(n int) Option {
return func(cfg *config) error {
if n < 0 {
return errors.New("reprovider config: max workers must be non-negative")
}
cfg.maxWorkers = n
return nil
}
}

// WithDedicatedPeriodicWorkers sets the number of workers dedicated to
// periodic region reprovides.
func WithDedicatedPeriodicWorkers(n int) Option {
return func(cfg *config) error {
if n < 0 {
return errors.New("reprovider config: dedicated periodic workers must be non-negative")
}
cfg.dedicatedPeriodicWorkers = n
return nil
}
}

// WithDedicatedBurstWorkers sets the number of workers dedicated to burst
// operations. Burst operations consist in work that isn't scheduled
// beforehands, such as initial provides and catching up with reproviding after
// the node went offline for a while.
func WithDedicatedBurstWorkers(n int) Option {
return func(cfg *config) error {
if n < 0 {
return errors.New("reprovider config: dedicated burst workers must be non-negative")
}
cfg.dedicatedBurstWorkers = n
return nil
}
}

// WithMaxProvideConnsPerWorker sets the maximum number of connections to
// distinct peers that can be opened by a single worker during a provide
// operation.
func WithMaxProvideConnsPerWorker(n int) Option {
return func(cfg *config) error {
if n <= 0 {
return errors.New("reprovider config: max provide conns per worker must be greater than 0")
}
cfg.maxProvideConnsPerWorker = n
return nil
}
}

// WithKeyStore defines the KeyStore used to keep track of the keys that need
// to be reprovided.
func WithKeyStore(keyStore *datastore.KeyStore) Option {
return func(cfg *config) error {
if keyStore == nil {
return errors.New("reprovider config: multihash store cannot be nil")
}
cfg.keyStore = keyStore
return nil
}
}
Loading
Loading