diff --git a/tapfreighter/chain_porter.go b/tapfreighter/chain_porter.go index be5fc0135..ccb575c8d 100644 --- a/tapfreighter/chain_porter.go +++ b/tapfreighter/chain_porter.go @@ -132,7 +132,7 @@ func NewChainPorter(cfg *ChainPorterConfig) *ChainPorter { ) return &ChainPorter{ cfg: cfg, - outboundParcels: make(chan Parcel), + outboundParcels: make(chan Parcel, 10), subscribers: subscribers, ContextGuard: &fn.ContextGuard{ DefaultTimeout: tapgarden.DefaultTimeout, @@ -150,9 +150,20 @@ func (p *ChainPorter) Start() error { // Start the main chain porter goroutine. p.Wg.Add(1) - go p.mainEventLoop() + go func() { + defer p.Wg.Done() + p.mainEventLoop() + }() - startErr = p.resumePendingParcels() + // Resume any pending parcels in a new goroutine so that we + // don't delay returning from the Start method. Without this + // goroutine the resumePendingParcels method would block on the + // buffering constraint of the outboundParcels channel. + p.Wg.Add(1) + go func() { + defer p.Wg.Done() + startErr = p.resumePendingParcels() + }() }) return startErr @@ -311,8 +322,6 @@ func (p *ChainPorter) QueryParcels(ctx context.Context, // requests, and attempt to complete a transfer. A response is sent back to the // caller if a transfer can be completed. Otherwise, an error is returned. func (p *ChainPorter) mainEventLoop() { - defer p.Wg.Done() - for { select { case outboundParcel := <-p.outboundParcels: