Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
tapfreighter: rename field exportReqs to outboundParcels for clarity
This commit also adds a field doc comment.
  • Loading branch information
ffranr committed Jul 31, 2024
commit 9bb9948d6ac849ffded1fed2db50141789a52a5a
29 changes: 16 additions & 13 deletions tapfreighter/chain_porter.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ type ChainPorter struct {

cfg *ChainPorterConfig

exportReqs chan Parcel
// outboundParcels is a channel that carries outbound parcels that need
// to be processed by the main porter goroutine.
outboundParcels chan Parcel

// subscribers is a map of components that want to be notified on new
// events, keyed by their subscription ID.
Expand All @@ -129,9 +131,9 @@ func NewChainPorter(cfg *ChainPorterConfig) *ChainPorter {
map[uint64]*fn.EventReceiver[fn.Event],
)
return &ChainPorter{
cfg: cfg,
exportReqs: make(chan Parcel),
subscribers: subscribers,
cfg: cfg,
outboundParcels: make(chan Parcel),
subscribers: subscribers,
ContextGuard: &fn.ContextGuard{
DefaultTimeout: tapgarden.DefaultTimeout,
Quit: make(chan struct{}),
Expand All @@ -151,8 +153,8 @@ func (p *ChainPorter) Start() error {
go p.mainEventLoop()

// Identify any pending parcels that need to be resumed and add
// them to the exportReqs channel so they can be processed by
// the main porter goroutine.
// them to the outboundParcels channel so they can be processed
// by the main porter goroutine.
ctx, cancel := p.WithCtxQuit()
defer cancel()
outboundParcels, err := p.cfg.ExportLog.PendingParcels(ctx)
Expand All @@ -172,7 +174,7 @@ func (p *ChainPorter) Start() error {
// At this point the asset porter should be running.
// It should therefore pick up the pending parcels from
// the channel and attempt to deliver them.
p.exportReqs <- NewPendingParcel(outboundParcel)
p.outboundParcels <- NewPendingParcel(outboundParcel)
}
})

Expand Down Expand Up @@ -211,7 +213,7 @@ func (p *ChainPorter) RequestShipment(req Parcel) (*OutboundParcel, error) {
return nil, fmt.Errorf("failed to validate parcel: %w", err)
}

if !fn.SendOrQuit(p.exportReqs, req, p.Quit) {
if !fn.SendOrQuit(p.outboundParcels, req, p.Quit) {
return nil, fmt.Errorf("ChainPorter shutting down")
}

Expand Down Expand Up @@ -247,16 +249,17 @@ func (p *ChainPorter) mainEventLoop() {

for {
select {
case req := <-p.exportReqs:
// The request either has a destination address we want
// to send to, or a send package is already initialized.
sendPkg := req.pkg()
case outboundParcel := <-p.outboundParcels:
// The outbound parcel either has a destination address
// we want to send to, or a send package is already
// initialized.
sendPkg := outboundParcel.pkg()

// Advance the state machine for this package as far as
// possible in its own goroutine. The status will be
// reported through the different channels of the send
// package.
go p.advanceState(sendPkg, req.kit())
go p.advanceState(sendPkg, outboundParcel.kit())

case <-p.Quit:
return
Expand Down