Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
tapfreighter: exec ChainPorter.resumePendingParcels in new goroutine
Resume any pending parcels in a new goroutine so that we don't delay
returning from the `ChainPorter.Start` method.
  • Loading branch information
ffranr committed Aug 8, 2024
commit 9ac1baacd996c20ce6b9c51fa77b4b84e0d3fe5d
10 changes: 9 additions & 1 deletion tapfreighter/chain_porter.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,15 @@ func (p *ChainPorter) Start() error {
p.mainEventLoop()
}()

startErr = p.resumePendingParcels()
// Resume any pending parcels in a new goroutine so that we
// don't delay returning from the Start method. Without this
// goroutine the resumePendingParcels method would block on the
// buffering constraint of the outboundParcels channel.
p.Wg.Add(1)
go func() {
defer p.Wg.Done()
startErr = p.resumePendingParcels()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think resumePendingParcels() needs to be modified to receive Quit signals?

Like this case:

But here:

p.outboundParcels <- pendingParcel

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it even a problem if we delay startup by waiting for pending parcels? Anything that takes a while (e.g. proof transfer) will be done in a goroutine anyway. So I don't see a pressing reason to do things async here.

Also, if we do this in another goroutine, we don't need the buffered channel as already mentioned by @jharveyb. I think it just makes the behavior less deterministic (e.g. with 9 parcels the goroutine finishes almost immediately but with 11 it blocks until complete)...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC the start of each 'subsystem' is sequential and blocking on any one Start() call would block the rest?

if err := s.cfg.ChainPorter.Start(); err != nil {

Even with proof transfer in a goroutine, if we're resuming more than one parcel I think the transfer process for the first one would block resumption of the second? And these wouldn't happen in parallel.

The concrete case I was thinking of was: "If I have a wide transfer (with many recipients), what happens on restart?"

I think they would attempt to be transferred, sequentially, and any issues with proof upload at that point would block the caretaker startup. Maybe I'm wrong about which errors would cause which functions to block though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already spin up a goroutine for each individual parcel:

go p.advanceState(sendPkg, outboundParcel.kit())

So at startup, because the main event loop already is a goroutine and just starts another one for each parcel, we can feed in the parcels to resume synchronously and block on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so we can handle parcels in parallel then?

In that case, IIUC, the only startup delay would be calling advanceState() for each resumed parcel, which should be fast.

And then we actually don't need to modify the current behavior, and the problem I was thinking of doesn't exist.

I'm not sure if we have an existing test that handles multiple parcels at once, that would be good to have to validate this.

}()
})

return startErr
Expand Down