Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/go-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,5 @@ concurrency:
jobs:
go-test:
uses: ipdxco/unified-github-workflows/.github/workflows/[email protected]
with:
go-versions: '["1.23.x", "1.24.x"]'
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
27 changes: 26 additions & 1 deletion dual/provider/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type config struct {
reprovideInterval [2]time.Duration // [0] = LAN, [1] = WAN
maxReprovideDelay [2]time.Duration

offlineDelay [2]time.Duration
connectivityCheckOnlineInterval [2]time.Duration
connectivityCheckOfflineInterval [2]time.Duration

Expand Down Expand Up @@ -75,8 +76,8 @@ var DefaultConfig = func(cfg *config) error {
cfg.reprovideInterval = [2]time.Duration{amino.DefaultReprovideInterval, amino.DefaultReprovideInterval}
cfg.maxReprovideDelay = [2]time.Duration{provider.DefaultMaxReprovideDelay, provider.DefaultMaxReprovideDelay}

cfg.offlineDelay = [2]time.Duration{provider.DefaultOfflineDelay, provider.DefaultOfflineDelay}
cfg.connectivityCheckOnlineInterval = [2]time.Duration{provider.DefaultConnectivityCheckOnlineInterval, provider.DefaultConnectivityCheckOnlineInterval}
cfg.connectivityCheckOfflineInterval = [2]time.Duration{provider.DefaultConnectivityCheckOfflineInterval, provider.DefaultConnectivityCheckOfflineInterval}

cfg.maxWorkers = [2]int{4, 4}
cfg.dedicatedPeriodicWorkers = [2]int{2, 2}
Expand Down Expand Up @@ -144,6 +145,30 @@ func WithMaxReprovideDelayWAN(maxReprovideDelay time.Duration) Option {
return withMaxReprovideDelay(maxReprovideDelay, wanID)
}

func withOfflineDelay(offlineDelay time.Duration, dhts ...uint8) Option {
return func(cfg *config) error {
if offlineDelay < 0 {
return fmt.Errorf("invalid offline delay %s", offlineDelay)
}
for _, dht := range dhts {
cfg.offlineDelay[dht] = offlineDelay
}
return nil
}
}

func WithOfflineDelay(offlineDelay time.Duration) Option {
return withOfflineDelay(offlineDelay, lanID, wanID)
}

func WithOfflineDelayLAN(offlineDelay time.Duration) Option {
return withOfflineDelay(offlineDelay, lanID)
}

func WithOfflineDelayWAN(offlineDelay time.Duration) Option {
return withOfflineDelay(offlineDelay, wanID)
}

func withConnectivityCheckOnlineInterval(onlineInterval time.Duration, dhts ...uint8) Option {
return func(cfg *config) error {
if onlineInterval <= 0 {
Expand Down
95 changes: 54 additions & 41 deletions dual/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,16 @@ package provider
import (
"context"
"errors"
"sync/atomic"
"fmt"

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/dual"
"github.com/libp2p/go-libp2p-kad-dht/provider"
"github.com/libp2p/go-libp2p-kad-dht/provider/datastore"
mh "github.com/multiformats/go-multihash"
)

var logger = logging.Logger(provider.LoggerName)

// SweepingProvider manages provides and reprovides for both DHT swarms (LAN
// and WAN) in the dual DHT setup.
type SweepingProvider struct {
Expand Down Expand Up @@ -60,8 +57,8 @@ func New(d *dual.DHT, opts ...Option) (*SweepingProvider, error) {
provider.WithMessageSender(cfg.msgSenders[i]),
provider.WithReprovideInterval(cfg.reprovideInterval[i]),
provider.WithMaxReprovideDelay(cfg.maxReprovideDelay[i]),
provider.WithOfflineDelay(cfg.offlineDelay[i]),
provider.WithConnectivityCheckOnlineInterval(cfg.connectivityCheckOnlineInterval[i]),
provider.WithConnectivityCheckOfflineInterval(cfg.connectivityCheckOfflineInterval[i]),
provider.WithMaxWorkers(cfg.maxWorkers[i]),
provider.WithDedicatedPeriodicWorkers(cfg.dedicatedPeriodicWorkers[i]),
provider.WithDedicatedBurstWorkers(cfg.dedicatedBurstWorkers[i]),
Expand All @@ -83,29 +80,38 @@ func New(d *dual.DHT, opts ...Option) (*SweepingProvider, error) {

// runOnBoth runs the provided function on both the LAN and WAN providers in
// parallel and waits for both to complete.
func (s *SweepingProvider) runOnBoth(wait bool, f func(*provider.SweepingProvider)) {
if wait {
done := make(chan struct{})
go func() {
defer close(done)
f(s.LAN)
}()
f(s.WAN)
<-done
return
func (s *SweepingProvider) runOnBoth(f func(*provider.SweepingProvider) error) error {
var errs [2]error
done := make(chan struct{})
go func() {
defer close(done)
err := f(s.LAN)
if err != nil {
errs[0] = fmt.Errorf("LAN provider: %w", err)
}
}()
err := f(s.WAN)
if err != nil {
errs[1] = fmt.Errorf("WAN provider: %w", err)
}
go f(s.LAN)
go f(s.WAN)
<-done
return errors.Join(errs[:]...)
}

// ProvideOnce sends provider records for the specified keys to both DHT swarms
// only once. It does not automatically reprovide those keys afterward.
//
// Add the supplied multihashes to the provide queue, and return immediately.
// Add the supplied multihashes to the provide queues, and return right after.
// The provide operation happens asynchronously.
func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) {
s.runOnBoth(false, func(p *provider.SweepingProvider) {
p.ProvideOnce(keys...)
//
// Returns an error if the keys couldn't be added to the provide queue. This
// can happen if the provider is closed or if the node is currently Offline
// (either never bootstrapped, or disconnected since more than `OfflineDelay`).
// The schedule and provide queue depend on the network size, hence recent
// network connectivity is essential.
func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) error {
return s.runOnBoth(func(p *provider.SweepingProvider) error {
return p.ProvideOnce(keys...)
})
}

Expand All @@ -120,23 +126,28 @@ func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) {
//
// This operation is asynchronous, it returns as soon as the `keys` are added
// to the provide queue, and provides happens asynchronously.
func (s *SweepingProvider) StartProviding(force bool, keys ...mh.Multihash) {
//
// Returns an error if the keys couldn't be added to the provide queue. This
// can happen if the provider is closed or if the node is currently Offline
// (either never bootstrapped, or disconnected since more than `OfflineDelay`).
// The schedule and provide queue depend on the network size, hence recent
// network connectivity is essential.
func (s *SweepingProvider) StartProviding(force bool, keys ...mh.Multihash) error {
ctx := context.Background()
newKeys, err := s.keyStore.Put(ctx, keys...)
if err != nil {
logger.Warnf("failed to store multihashes: %v", err)
return
return fmt.Errorf("failed to store multihashes: %w", err)
}

s.runOnBoth(false, func(p *provider.SweepingProvider) {
p.AddToSchedule(newKeys...)
s.runOnBoth(func(p *provider.SweepingProvider) error {
return p.AddToSchedule(newKeys...)
})

if !force {
keys = newKeys
}

s.ProvideOnce(keys...)
return s.ProvideOnce(keys...)
}

// StopProviding stops reproviding the given keys to both DHT swarms. The node
Expand All @@ -146,11 +157,12 @@ func (s *SweepingProvider) StartProviding(force bool, keys ...mh.Multihash) {
// Remove the `keys` from the schedule and return immediately. Valid records
// can remain in the DHT swarms up to the provider record TTL after calling
// `StopProviding`.
func (s *SweepingProvider) StopProviding(keys ...mh.Multihash) {
func (s *SweepingProvider) StopProviding(keys ...mh.Multihash) error {
err := s.keyStore.Delete(context.Background(), keys...)
if err != nil {
logger.Warnf("failed to stop providing keys: %s", err)
return fmt.Errorf("failed to stop providing keys: %w", err)
}
return nil
}

// Clear clears the all the keys from the provide queues of both DHTs and
Expand All @@ -159,11 +171,7 @@ func (s *SweepingProvider) StopProviding(keys ...mh.Multihash) {
// The keys are not deleted from the keystore, so they will continue to be
// reprovided as scheduled.
func (s *SweepingProvider) Clear() int {
var total atomic.Int32
s.runOnBoth(true, func(p *provider.SweepingProvider) {
total.Add(int32(p.Clear()))
})
return int(total.Load())
return s.LAN.Clear() + s.WAN.Clear()
}

// RefreshSchedule scans the KeyStore for any keys that are not currently
Expand All @@ -173,9 +181,14 @@ func (s *SweepingProvider) Clear() int {
// This function doesn't remove prefixes that have no keys from the schedule.
// This is done automatically during the reprovide operation if a region has no
// keys.
func (s *SweepingProvider) RefreshSchedule() {
go s.runOnBoth(false, func(p *provider.SweepingProvider) {
p.RefreshSchedule()
//
// Returns an error if the provider is closed or if the node is currently
// Offline (either never bootstrapped, or disconnected since more than
// `OfflineDelay`). The schedule depends on the network size, hence recent
// network connectivity is essential.
func (s *SweepingProvider) RefreshSchedule() error {
return s.runOnBoth(func(p *provider.SweepingProvider) error {
return p.RefreshSchedule()
})
}

Expand All @@ -187,9 +200,9 @@ var (
// dhtProvider is the interface to ensure that SweepingProvider and
// provider.SweepingProvider share the same interface.
type dhtProvider interface {
StartProviding(force bool, keys ...mh.Multihash)
StopProviding(keys ...mh.Multihash)
ProvideOnce(keys ...mh.Multihash)
StartProviding(force bool, keys ...mh.Multihash) error
StopProviding(keys ...mh.Multihash) error
ProvideOnce(keys ...mh.Multihash) error
Clear() int
RefreshSchedule()
RefreshSchedule() error
}
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
module github.com/libp2p/go-libp2p-kad-dht

go 1.23.10
go 1.24

require (
github.com/filecoin-project/go-clock v0.1.0
github.com/gammazero/deque v1.0.0
github.com/google/gopacket v1.1.19
github.com/google/uuid v1.6.0
Expand All @@ -14,7 +13,7 @@ require (
github.com/ipfs/go-datastore v0.8.2
github.com/ipfs/go-detect-race v0.0.1
github.com/ipfs/go-log/v2 v2.8.0
github.com/ipfs/go-test v0.2.2
github.com/ipfs/go-test v0.2.3
github.com/libp2p/go-libp2p v0.43.0
github.com/libp2p/go-libp2p-kbucket v0.8.0
github.com/libp2p/go-libp2p-record v0.3.1
Expand Down Expand Up @@ -48,6 +47,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
github.com/filecoin-project/go-clock v0.1.0 // indirect
github.com/flynn/noise v1.1.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/go-logr/logr v1.4.3 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyB
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
github.com/ipfs/go-log/v2 v2.8.0 h1:SptNTPJQV3s5EF4FdrTu/yVdOKfGbDgn1EBZx4til2o=
github.com/ipfs/go-log/v2 v2.8.0/go.mod h1:2LEEhdv8BGubPeSFTyzbqhCqrwqxCbuTNTLWqgNAipo=
github.com/ipfs/go-test v0.2.2 h1:1yjYyfbdt1w93lVzde6JZ2einh3DIV40at4rVoyEcE8=
github.com/ipfs/go-test v0.2.2/go.mod h1:cmLisgVwkdRCnKu/CFZOk2DdhOcwghr5GsHeqwexoRA=
github.com/ipfs/go-test v0.2.3 h1:Z/jXNAReQFtCYyn7bsv/ZqUwS6E7iIcSpJ2CuzCvnrc=
github.com/ipfs/go-test v0.2.3/go.mod h1:QW8vSKkwYvWFwIZQLGQXdkt9Ud76eQXRQ9Ao2H+cA1o=
github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH9C2E=
github.com/ipld/go-ipld-prime v0.21.0/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOanyMctpPjsvxQ=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
Expand Down
Loading
Loading