From 6e3ffae78c15816c5e283e6dad5413c49335e6f2 Mon Sep 17 00:00:00 2001 From: ffranr Date: Thu, 8 Aug 2024 12:29:14 +0100 Subject: [PATCH 1/3] tapfreighter: reposition WaitGroup.Done() call for better clarity This commit moves the WaitGroup.Done() call closer to the corresponding WaitGroup.Add(1) call. The purpose of this change is to group the goroutine management code together, making it easier to read and reducing the risk of forgetting to decrement the WaitGroup counter. --- tapfreighter/chain_porter.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tapfreighter/chain_porter.go b/tapfreighter/chain_porter.go index be5fc0135..2ac9c8bfa 100644 --- a/tapfreighter/chain_porter.go +++ b/tapfreighter/chain_porter.go @@ -150,7 +150,10 @@ func (p *ChainPorter) Start() error { // Start the main chain porter goroutine. p.Wg.Add(1) - go p.mainEventLoop() + go func() { + defer p.Wg.Done() + p.mainEventLoop() + }() startErr = p.resumePendingParcels() }) @@ -311,8 +314,6 @@ func (p *ChainPorter) QueryParcels(ctx context.Context, // requests, and attempt to complete a transfer. A response is sent back to the // caller if a transfer can be completed. Otherwise, an error is returned. func (p *ChainPorter) mainEventLoop() { - defer p.Wg.Done() - for { select { case outboundParcel := <-p.outboundParcels: From b9b3062d307bc8264d9460194f64a32b22414155 Mon Sep 17 00:00:00 2001 From: ffranr Date: Thu, 8 Aug 2024 12:32:17 +0100 Subject: [PATCH 2/3] tapfreighter: buffer outboundParcels channel to prevent blocking This commit introduces a buffer to the ChainPorter.outboundParcels channel. By adding this buffer, the system can handle new parcels without being blocked by resumed pending parcels, improving overall efficiency and reducing potential delays. --- tapfreighter/chain_porter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tapfreighter/chain_porter.go b/tapfreighter/chain_porter.go index 2ac9c8bfa..ad8c3da5c 100644 --- a/tapfreighter/chain_porter.go +++ b/tapfreighter/chain_porter.go @@ -132,7 +132,7 @@ func NewChainPorter(cfg *ChainPorterConfig) *ChainPorter { ) return &ChainPorter{ cfg: cfg, - outboundParcels: make(chan Parcel), + outboundParcels: make(chan Parcel, 10), subscribers: subscribers, ContextGuard: &fn.ContextGuard{ DefaultTimeout: tapgarden.DefaultTimeout, From 9ac1baacd996c20ce6b9c51fa77b4b84e0d3fe5d Mon Sep 17 00:00:00 2001 From: ffranr Date: Thu, 8 Aug 2024 12:42:30 +0100 Subject: [PATCH 3/3] tapfreighter: exec ChainPorter.resumePendingParcels in new goroutine Resume any pending parcels in a new goroutine so that we don't delay returning from the `ChainPorter.Start` method. --- tapfreighter/chain_porter.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tapfreighter/chain_porter.go b/tapfreighter/chain_porter.go index ad8c3da5c..ccb575c8d 100644 --- a/tapfreighter/chain_porter.go +++ b/tapfreighter/chain_porter.go @@ -155,7 +155,15 @@ func (p *ChainPorter) Start() error { p.mainEventLoop() }() - startErr = p.resumePendingParcels() + // Resume any pending parcels in a new goroutine so that we + // don't delay returning from the Start method. Without this + // goroutine the resumePendingParcels method would block on the + // buffering constraint of the outboundParcels channel. + p.Wg.Add(1) + go func() { + defer p.Wg.Done() + startErr = p.resumePendingParcels() + }() }) return startErr