diff --git a/fn/option.go b/fn/option.go index 3450dac2d..125183d3f 100644 --- a/fn/option.go +++ b/fn/option.go @@ -59,7 +59,7 @@ func (o Option[A]) UnwrapOr(a A) A { return a } -// UnwrapPtr is used to extract a reference to a value from an option, and we +// UnwrapToPtr is used to extract a reference to a value from an option, and we // supply an empty pointer in the case when the Option is empty. func (o Option[A]) UnwrapToPtr() *A { var v *A diff --git a/itest/psbt_test.go b/itest/psbt_test.go index a4c024340..87ec2ce2e 100644 --- a/itest/psbt_test.go +++ b/itest/psbt_test.go @@ -1042,8 +1042,8 @@ func testPsbtMultiSend(t *harnessTest) { ctxt, cancel := context.WithTimeout(ctxb, defaultWaitTimeout) defer cancel() - // Now that we have the asset created, we'll make a new node that'll - // serve as the node which'll receive the assets. + // With the asset created, we'll set up a new node that will act as the + // receiver of the transfer. secondTapd := setupTapdHarness( t.t, t, t.lndHarness.Bob, t.universeServer, ) diff --git a/itest/send_test.go b/itest/send_test.go index cbf5d804e..af5696f82 100644 --- a/itest/send_test.go +++ b/itest/send_test.go @@ -16,6 +16,7 @@ import ( wrpc "github.com/lightninglabs/taproot-assets/taprpc/assetwalletrpc" "github.com/lightninglabs/taproot-assets/taprpc/mintrpc" "github.com/lightninglabs/taproot-assets/taprpc/tapdevrpc" + unirpc "github.com/lightninglabs/taproot-assets/taprpc/universerpc" "github.com/lightningnetwork/lnd/lntest/wait" "github.com/stretchr/testify/require" ) @@ -573,7 +574,8 @@ func testBasicSendPassiveAsset(t *harnessTest) { // testReattemptFailedSendHashmailCourier tests that a failed attempt at // sending an asset proof will be reattempted by the tapd node. This test -// targets the hashmail courier. +// targets the hashmail courier. The proof courier is specified in the test +// list entry. func testReattemptFailedSendHashmailCourier(t *harnessTest) { var ( ctxb = context.Background() @@ -654,11 +656,6 @@ func testReattemptFailedSendHashmailCourier(t *harnessTest) { // Simulate a failed attempt at sending the asset proof by stopping // the receiver node. - // - // The receiving tapd node does not return a proof received confirmation - // message via the universe RPC courier. We can simulate a proof - // transfer failure by stopping the courier service directly and not the - // receiving tapd node. require.NoError(t.t, t.tapd.stop(false)) // Send asset and then mine to confirm the associated on-chain tx. @@ -668,33 +665,80 @@ func testReattemptFailedSendHashmailCourier(t *harnessTest) { wg.Wait() } -// testReattemptFailedSendUniCourier tests that a failed attempt at -// sending an asset proof will be reattempted by the tapd node. This test -// targets the universe proof courier. -func testReattemptFailedSendUniCourier(t *harnessTest) { +// testReattemptProofTransferOnTapdRestart tests that a failed attempt at +// transferring a transfer output proof to a proof courier will be reattempted +// by the sending tapd node upon restart. This test targets the universe +// courier. +func testReattemptProofTransferOnTapdRestart(t *harnessTest) { var ( ctxb = context.Background() wg sync.WaitGroup ) - // Make a new node which will send the asset to the primary tapd node. - // We expect this node to fail because our send call will time out - // whilst the porter continues to attempt to send the asset. + // For this test we will use the universe server as the proof courier. + proofCourier := t.universeServer + + // Make a new tapd node which will send an asset to a receiving tapd + // node. sendTapd := setupTapdHarness( t.t, t, t.lndHarness.Bob, t.universeServer, func(params *tapdHarnessParams) { params.expectErrExit = true + params.proofCourier = proofCourier }, ) + defer func() { + // Any node that has been started within an itest should be + // explicitly stopped within the same itest. + require.NoError(t.t, sendTapd.stop(!*noDelete)) + }() + + // Use the primary tapd node as the receiver node. + recvTapd := t.tapd + + // Use the sending node to mint an asset for sending. + rpcAssets := MintAssetsConfirmBatch( + t.t, t.lndHarness.Miner.Client, sendTapd, + []*mintrpc.MintAssetRequest{simpleAssets[0]}, + ) + + genInfo := rpcAssets[0].AssetGenesis + + // After minting an asset with the sending node, we need to synchronize + // the Universe state to ensure the receiving node is updated and aware + // of the asset. + t.syncUniverseState(sendTapd, recvTapd, len(rpcAssets)) + + // Create a new address for the receiver node. We will use the universe + // server as the proof courier. + proofCourierAddr := fmt.Sprintf( + "%s://%s", proof.UniverseRpcCourierType, + proofCourier.service.rpcHost(), + ) + t.Logf("Proof courier address: %s", proofCourierAddr) + + recvAddr, err := recvTapd.NewAddr(ctxb, &taprpc.NewAddrRequest{ + AssetId: genInfo.AssetId, + Amt: 10, + ProofCourierAddr: proofCourierAddr, + }) + require.NoError(t.t, err) + AssertAddrCreated(t.t, recvTapd, rpcAssets[0], recvAddr) + + // Soon we will be attempting to send an asset to the receiver node. We + // want the attempt to fail until we restart the sending node. + // Therefore, we will take the proof courier service offline. + t.Log("Stopping proof courier service") + require.NoError(t.t, proofCourier.Stop()) - // Subscribe to receive asset send events from the sending tapd node. + // Now that the proof courier service is offline, the sending node's + // attempt to transfer the asset proof should fail. + // + // We will soon start the asset transfer process. However, before we + // start, we subscribe to the send events from the sending tapd node so + // that we can be sure that a transfer has been attempted. events := SubscribeSendEvents(t.t, sendTapd) - // Test to ensure that we receive the expected number of backoff wait - // event notifications. - // This test is executed in a goroutine to ensure that we can receive - // the event notification(s) from the tapd node as the rest of the test - // proceeds. wg.Add(1) go func() { defer wg.Done() @@ -712,7 +756,8 @@ func testReattemptFailedSendUniCourier(t *harnessTest) { // Expected number of events is one less than the number of // tries because the first attempt does not count as a backoff // event. - nodeBackoffCfg := t.tapd.clientCfg.HashMailCourier.BackoffCfg + nodeBackoffCfg := + sendTapd.clientCfg.UniverseRpcCourier.BackoffCfg expectedEventCount := nodeBackoffCfg.NumTries - 1 // Context timeout scales with expected number of events. @@ -729,7 +774,108 @@ func testReattemptFailedSendUniCourier(t *harnessTest) { ) }() - // Mint an asset for sending. + // Start asset transfer and then mine to confirm the associated on-chain + // tx. The on-chain tx should be mined successfully, but we expect the + // asset proof transfer to be unsuccessful. + sendResp, _ := sendAssetsToAddr(t, sendTapd, recvAddr) + MineBlocks(t.t, t.lndHarness.Miner.Client, 1, 1) + + // Wait to ensure that the asset transfer attempt has been made. + wg.Wait() + + // Stop the sending tapd node. This downtime will give us the + // opportunity to restart the proof courier service. + t.Log("Stopping sending tapd node") + require.NoError(t.t, sendTapd.stop(false)) + + // Restart the proof courier service. + t.Log("Starting proof courier service") + require.NoError(t.t, proofCourier.Start(nil)) + t.Logf("Proof courier address: %s", proofCourier.service.rpcHost()) + + // Ensure that the proof courier address has not changed on restart. + // The port is currently selected opportunistically. + // If the proof courier address has changed the tap address will be + // stale. + newProofCourierAddr := fmt.Sprintf( + "%s://%s", proof.UniverseRpcCourierType, + proofCourier.service.rpcHost(), + ) + require.Equal(t.t, proofCourierAddr, newProofCourierAddr) + + // Identify receiver's asset transfer output. + require.Len(t.t, sendResp.Transfer.Outputs, 2) + recvOutput := sendResp.Transfer.Outputs[0] + + // If the script key of the output is local to the sending node, then + // the receiver's output is the second output. + if recvOutput.ScriptKeyIsLocal { + recvOutput = sendResp.Transfer.Outputs[1] + } + + // Formulate a universe key to query the proof courier for the asset + // transfer proof. + uniKey := unirpc.UniverseKey{ + Id: &unirpc.ID{ + Id: &unirpc.ID_AssetId{ + AssetId: genInfo.AssetId, + }, + ProofType: unirpc.ProofType_PROOF_TYPE_TRANSFER, + }, + LeafKey: &unirpc.AssetKey{ + Outpoint: &unirpc.AssetKey_OpStr{ + OpStr: recvOutput.Anchor.Outpoint, + }, + ScriptKey: &unirpc.AssetKey_ScriptKeyBytes{ + ScriptKeyBytes: recvOutput.ScriptKey, + }, + }, + } + + // Ensure that the transfer proof has not reached the proof courier yet. + resp, err := proofCourier.service.QueryProof(ctxb, &uniKey) + require.Nil(t.t, resp) + require.ErrorContains(t.t, err, "no universe proof found") + + // Restart the sending tapd node. The node should reattempt to transfer + // the asset proof to the proof courier. + t.Log("Restarting sending tapd node") + require.NoError(t.t, sendTapd.start(false)) + + require.Eventually(t.t, func() bool { + resp, err = proofCourier.service.QueryProof(ctxb, &uniKey) + return err == nil && resp != nil + }, defaultWaitTimeout, 200*time.Millisecond) + + // TODO(ffranr): Modify the receiver node proof retrieval backoff + // schedule such that we can assert that the transfer fully completes + // in a timely and predictable manner. + // AssertNonInteractiveRecvComplete(t.t, recvTapd, 1) +} + +// testReattemptFailedSendUniCourier tests that a failed attempt at +// sending an asset proof will be reattempted by the tapd node. This test +// targets the universe proof courier. +func testReattemptFailedSendUniCourier(t *harnessTest) { + var ( + ctxb = context.Background() + wg sync.WaitGroup + ) + + // Make a new node which will send the asset to the primary tapd node. + // We expect this node to fail because our send call will time out + // whilst the porter continues to attempt to send the asset. + sendTapd := setupTapdHarness( + t.t, t, t.lndHarness.Bob, t.universeServer, + func(params *tapdHarnessParams) { + params.expectErrExit = true + }, + ) + + // Use the primary tapd node as the receiver node. + recvTapd := t.tapd + + // Use the sending node to mint an asset for sending. rpcAssets := MintAssetsConfirmBatch( t.t, t.lndHarness.Miner.Client, sendTapd, []*mintrpc.MintAssetRequest{simpleAssets[0]}, @@ -737,25 +883,65 @@ func testReattemptFailedSendUniCourier(t *harnessTest) { genInfo := rpcAssets[0].AssetGenesis - // Synchronize the Universe state of the second node, with the main - // node. - t.syncUniverseState(sendTapd, t.tapd, len(rpcAssets)) + // After minting an asset with the sending node, we need to synchronize + // the Universe state to ensure the receiving node is updated and aware + // of the asset. + t.syncUniverseState(sendTapd, recvTapd, len(rpcAssets)) // Create a new address for the receiver node. - recvAddr, err := t.tapd.NewAddr(ctxb, &taprpc.NewAddrRequest{ + recvAddr, err := recvTapd.NewAddr(ctxb, &taprpc.NewAddrRequest{ AssetId: genInfo.AssetId, Amt: 10, }) require.NoError(t.t, err) - AssertAddrCreated(t.t, t.tapd, rpcAssets[0], recvAddr) + AssertAddrCreated(t.t, recvTapd, rpcAssets[0], recvAddr) + + // No we will ensure that the expected number of backoff wait event + // notifications are emitted from the sending node. + // + // We identify backoff wait events in a goroutine to ensure that we can + // capture event notifications from the send node while the main + // test continues. + // + // Subscribe to proof transfer send events from the sending tapd node. + events := SubscribeSendEvents(t.t, sendTapd) + + wg.Add(1) + go func() { + defer wg.Done() + + // Define a target event selector to match the backoff wait + // event. This function selects for a specific event type. + targetEventSelector := func( + event *tapdevrpc.SendAssetEvent) bool { + + return AssertSendEventProofTransferBackoffWaitTypeSend( + t, event, + ) + } + + // Expected number of events is one less than the number of + // tries because the first attempt does not count as a backoff + // event. + nodeBackoffCfg := sendTapd.clientCfg.HashMailCourier.BackoffCfg + expectedEventCount := nodeBackoffCfg.NumTries - 1 + + // Context timeout scales with expected number of events. + timeout := time.Duration(expectedEventCount) * + defaultProofTransferReceiverAckTimeout + + // Allow for some margin for the operations that aren't pure + // waiting on the receiver ACK. + timeout += timeoutMargin + + assertAssetNtfsEvent( + t, events, timeout, targetEventSelector, + expectedEventCount, + ) + }() // Simulate a failed attempt at sending the asset proof by stopping // the proof courier service. - // - // In following the hashmail proof courier protocol, the receiver node - // returns a proof received confirmation message via the courier. - // We can simulate a proof transfer failure by stopping the receiving - // tapd node. The courier service should still be operational. require.NoError(t.t, t.proofCourier.Stop()) // Send asset and then mine to confirm the associated on-chain tx. @@ -765,9 +951,9 @@ func testReattemptFailedSendUniCourier(t *harnessTest) { wg.Wait() } -// testReattemptFailedReceiveUniCourier tests that a failed attempt at -// receiving an asset proof will be reattempted by the receiving tapd node. This -// test targets the universe proof courier. +// testReattemptFailedReceiveUniCourier ensures that a failed attempt to receive +// an asset proof is retried by the receiving Tapd node. This test focuses on +// the universe proof courier. func testReattemptFailedReceiveUniCourier(t *harnessTest) { ctxb := context.Background() diff --git a/itest/test_list_on_test.go b/itest/test_list_on_test.go index 7f6c2c731..ec3fee01d 100644 --- a/itest/test_list_on_test.go +++ b/itest/test_list_on_test.go @@ -92,6 +92,10 @@ var testCases = []*testCase{ name: "reattempt failed send uni courier", test: testReattemptFailedSendUniCourier, }, + { + name: "reattempt proof transfer on tapd restart", + test: testReattemptProofTransferOnTapdRestart, + }, { name: "reattempt failed receive uni courier", test: testReattemptFailedReceiveUniCourier, diff --git a/tapdb/assets_store.go b/tapdb/assets_store.go index 8f962aeec..ce9d231a7 100644 --- a/tapdb/assets_store.go +++ b/tapdb/assets_store.go @@ -2434,25 +2434,14 @@ func insertAssetTransferOutput(ctx context.Context, q ActiveAssetsStore, return fmt.Errorf("unable to insert script key: %w", err) } - // Now we will mark the output proof as undelivered if it is intended - // for a counterpart. - // - // If the transfer output proof is not intended for a counterpart the - // `proofDeliveryComplete` field will be left as NULL. Otherwise, we set - // it to false to indicate that the proof has not been delivered yet. - shouldDeliverProof, err := output.ShouldDeliverProof() - if err != nil { - return fmt.Errorf("unable to determine if proof should be "+ - "delivery for given transfer output: %w", err) - } - + // Marshal the proof delivery complete field to a nullable boolean. var proofDeliveryComplete sql.NullBool - if shouldDeliverProof { + output.ProofDeliveryComplete.WhenSome(func(deliveryComplete bool) { proofDeliveryComplete = sql.NullBool{ - Bool: false, + Bool: deliveryComplete, Valid: true, } - } + }) // Check if position value can be stored in a 32-bit integer. Type cast // if possible, otherwise return an error. @@ -3151,24 +3140,30 @@ func (a *AssetStore) PendingParcels( // QueryParcels returns the set of confirmed or unconfirmed parcels. func (a *AssetStore) QueryParcels(ctx context.Context, anchorTxHash *chainhash.Hash, - pending bool) ([]*tapfreighter.OutboundParcel, error) { + unconfirmedTxOnly bool) ([]*tapfreighter.OutboundParcel, error) { - var transfers []*tapfreighter.OutboundParcel + var ( + outboundParcels []*tapfreighter.OutboundParcel + readOpts = NewAssetStoreReadTx() + ) - readOpts := NewAssetStoreReadTx() dbErr := a.db.ExecTx(ctx, &readOpts, func(q ActiveAssetsStore) error { // Construct transfer query. - transferQuery := TransferQuery{ - UnconfOnly: pending, + // + // Serialise anchor tx hash as bytes if specified. + var anchorTxHashBytes []byte + if anchorTxHash != nil { + anchorTxHashBytes = anchorTxHash[:] } - // Include anchor tx hash if specified. - if anchorTxHash != nil { - transferQuery.AnchorTxHash = anchorTxHash[:] + transferQuery := TransferQuery{ + // If we want unconfirmed transfers only, we set the + // UnconfOnly field to true. + UnconfOnly: unconfirmedTxOnly, + AnchorTxHash: anchorTxHashBytes, } - // If we want every unconfirmed transfer, then we only pass in - // the UnconfOnly field. + // Query for asset transfers. dbTransfers, err := q.QueryAssetTransfers(ctx, transferQuery) if err != nil { return err @@ -3177,6 +3172,7 @@ func (a *AssetStore) QueryParcels(ctx context.Context, for idx := range dbTransfers { dbT := dbTransfers[idx] + // Fetch the inputs and outputs for the transfer. inputs, err := fetchAssetTransferInputs(ctx, q, dbT.ID) if err != nil { return fmt.Errorf("unable to fetch transfer "+ @@ -3192,7 +3188,8 @@ func (a *AssetStore) QueryParcels(ctx context.Context, } // We know that the anchor transaction is the same for - // each output, we can just fetch the first. + // each output. Therefore, we use the first output to + // fetch the transfer's anchor transaction. if len(outputs) == 0 { return fmt.Errorf("no outputs for transfer") } @@ -3213,7 +3210,7 @@ func (a *AssetStore) QueryParcels(ctx context.Context, "anchor tx: %w", err) } - transfer := &tapfreighter.OutboundParcel{ + parcel := &tapfreighter.OutboundParcel{ AnchorTx: anchorTx, AnchorTxHeightHint: uint32(dbT.HeightHint), TransferTime: dbT.TransferTimeUnix.UTC(), @@ -3221,7 +3218,7 @@ func (a *AssetStore) QueryParcels(ctx context.Context, Inputs: inputs, Outputs: outputs, } - transfers = append(transfers, transfer) + outboundParcels = append(outboundParcels, parcel) } return nil @@ -3230,7 +3227,7 @@ func (a *AssetStore) QueryParcels(ctx context.Context, return nil, dbErr } - return transfers, nil + return outboundParcels, nil } // ErrAssetMetaNotFound is returned when an asset meta is not found in the diff --git a/tapdb/multiverse.go b/tapdb/multiverse.go index 9dd3e338f..85a023272 100644 --- a/tapdb/multiverse.go +++ b/tapdb/multiverse.go @@ -841,8 +841,8 @@ func (b *MultiverseStore) RootNodes(ctx context.Context, } // FetchProofLeaf returns a proof leaf for the target key. If the key doesn't -// have a script key specified, then all the proof leafs for the minting -// outpoint will be returned. If neither are specified, then all inserted proof +// have a script key specified, then all the proof leafs for the outpoint will +// be returned. If neither are specified, then all inserted proof // leafs will be returned. func (b *MultiverseStore) FetchProofLeaf(ctx context.Context, id universe.Identifier, diff --git a/tapfreighter/chain_porter.go b/tapfreighter/chain_porter.go index cd800b108..be5fc0135 100644 --- a/tapfreighter/chain_porter.go +++ b/tapfreighter/chain_porter.go @@ -110,7 +110,9 @@ type ChainPorter struct { cfg *ChainPorterConfig - exportReqs chan Parcel + // outboundParcels is a channel that carries outbound parcels that need + // to be processed by the main porter goroutine. + outboundParcels chan Parcel // subscribers is a map of components that want to be notified on new // events, keyed by their subscription ID. @@ -129,9 +131,9 @@ func NewChainPorter(cfg *ChainPorterConfig) *ChainPorter { map[uint64]*fn.EventReceiver[fn.Event], ) return &ChainPorter{ - cfg: cfg, - exportReqs: make(chan Parcel), - subscribers: subscribers, + cfg: cfg, + outboundParcels: make(chan Parcel), + subscribers: subscribers, ContextGuard: &fn.ContextGuard{ DefaultTimeout: tapgarden.DefaultTimeout, Quit: make(chan struct{}), @@ -148,37 +150,103 @@ func (p *ChainPorter) Start() error { // Start the main chain porter goroutine. p.Wg.Add(1) - go p.assetsPorter() + go p.mainEventLoop() - // Identify any pending parcels that need to be resumed and add - // them to the exportReqs channel so they can be processed by - // the main porter goroutine. - ctx, cancel := p.WithCtxQuit() - defer cancel() - outboundParcels, err := p.cfg.ExportLog.PendingParcels(ctx) - if err != nil { - startErr = err - return - } - - // We resume delivery using the normal parcel delivery mechanism - // by converting the outbound parcels into pending parcels. - for idx := range outboundParcels { - outboundParcel := outboundParcels[idx] - log.Infof("Attempting to resume delivery for "+ - "anchor_txid=%v", - outboundParcel.AnchorTx.TxHash().String()) - - // At this point the asset porter should be running. - // It should therefore pick up the pending parcels from - // the channel and attempt to deliver them. - p.exportReqs <- NewPendingParcel(outboundParcel) - } + startErr = p.resumePendingParcels() }) return startErr } +// resumePendingParcels attempts to resume delivery for any pending parcels that +// were previously interrupted. This is done by querying the export log for any +// pending parcels and adding them to the outboundParcels channel so they can be +// processed by the main porter goroutine. +func (p *ChainPorter) resumePendingParcels() error { + ctx, cancel := p.WithCtxQuit() + defer cancel() + + outboundParcels, err := p.cfg.ExportLog.PendingParcels(ctx) + if err != nil { + return err + } + + // Return early if there are no pending parcels to resume. + if len(outboundParcels) == 0 { + log.Info("No pending parcels to resume") + return nil + } + + log.Infof("Attempting to resume asset transfer for %d parcels", + len(outboundParcels)) + + // We resume delivery using the normal parcel delivery mechanism by + // converting the outbound parcels into pending parcels. + for idx := range outboundParcels { + outboundParcel := outboundParcels[idx] + + pendingParcel := NewPendingParcel(outboundParcel) + reportPendingParcel(*pendingParcel) + + // At this point the asset porter should be running. It should + // therefore pick up the pending parcels from the channel and + // attempt to deliver them. + p.outboundParcels <- pendingParcel + } + + return nil +} + +// reportPendingParcel logs information about a pending parcel. +func reportPendingParcel(pendingParcel PendingParcel) { + outboundParcel := pendingParcel.pkg().OutboundPkg + + // Formulate a log entry for each proof delivery pending transfer output + // for the pending parcel. + var outputLogStrings []string + + for idx := range outboundParcel.Outputs { + transferOut := outboundParcel.Outputs[idx] + + // Process only the proof outputs that are pending delivery. + // Skip outputs with proofs that don't need to be delivered to a + // peer (none) or those with proofs already delivered + // (some true). + if transferOut.ProofDeliveryComplete.UnwrapOr(true) { + continue + } + + // Construct a log string for the transfer output. + skBytes := transferOut.ScriptKey.PubKey.SerializeCompressed() + proofCourierAddr := string( + transferOut.ProofCourierAddr, + ) + + outputLog := fmt.Sprintf( + "transfer_output_idx=%d, script_key=%x, "+ + "proof_courier_addr=%s", + idx, skBytes, proofCourierAddr, + ) + outputLogStrings = append( + outputLogStrings, outputLog, + ) + } + + log.Infof("Encountered pending parcel "+ + "(anchor_txid=%v, count_undelivered_proofs=%d)", + outboundParcel.AnchorTx.TxHash().String(), + len(outputLogStrings)) + + // If there are any outputs with pending delivery proofs, we'll log + // them here. + if len(outputLogStrings) > 0 { + perOutputLog := strings.Join(outputLogStrings, "\n") + + log.Debugf("Transfer output(s) with delivery pending "+ + "proofs:\n%v", perOutputLog) + } +} + // Stop signals that the chain porter should gracefully stop. func (p *ChainPorter) Stop() error { var stopErr error @@ -211,7 +279,7 @@ func (p *ChainPorter) RequestShipment(req Parcel) (*OutboundParcel, error) { return nil, fmt.Errorf("failed to validate parcel: %w", err) } - if !fn.SendOrQuit(p.exportReqs, req, p.Quit) { + if !fn.SendOrQuit(p.outboundParcels, req, p.Quit) { return nil, fmt.Errorf("ChainPorter shutting down") } @@ -239,24 +307,25 @@ func (p *ChainPorter) QueryParcels(ctx context.Context, ) } -// assetsPorter is the main goroutine of the ChainPorter. This takes in incoming +// mainEventLoop is the main goroutine of the ChainPorter. This takes a parcel // requests, and attempt to complete a transfer. A response is sent back to the // caller if a transfer can be completed. Otherwise, an error is returned. -func (p *ChainPorter) assetsPorter() { +func (p *ChainPorter) mainEventLoop() { defer p.Wg.Done() for { select { - case req := <-p.exportReqs: - // The request either has a destination address we want - // to send to, or a send package is already initialized. - sendPkg := req.pkg() + case outboundParcel := <-p.outboundParcels: + // The outbound parcel either has a destination address + // we want to send to, or a send package is already + // initialized. + sendPkg := outboundParcel.pkg() // Advance the state machine for this package as far as // possible in its own goroutine. The status will be // reported through the different channels of the send // package. - go p.advanceState(sendPkg, req.kit()) + go p.advanceState(sendPkg, outboundParcel.kit()) case <-p.Quit: return @@ -653,8 +722,12 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error { } if !shouldDeliverProof { - log.Debugf("Not delivering proof for output with "+ - "script key %x", key.SerializeCompressed()) + log.Debugf("Transfer ouput proof does not require "+ + "delivery (transfer_output_position=%d, "+ + "proof_delivery_status=%v, "+ + "script_key=%x)", out.Position, + out.ProofDeliveryComplete, + key.SerializeCompressed()) return nil } @@ -673,8 +746,9 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error { "script key %x", key.SerializeCompressed()) } - log.Debugf("Attempting to deliver proof for script key %x", - key.SerializeCompressed()) + log.Debugf("Attempting to deliver proof (script_key=%x, "+ + "proof_courier_addr=%s)", key.SerializeCompressed(), + out.ProofCourierAddr) proofCourierAddr, err := proof.ParseCourierAddress( string(out.ProofCourierAddr), @@ -1061,6 +1135,9 @@ func (p *ChainPorter) stateStep(currentPkg sendPackage) (*sendPackage, error) { log.Infof("Committing pending parcel to disk") + // Write the parcel to disk as a pending parcel. This step also + // records the transfer details (e.g., reference to the anchor + // transaction ID, transfer outputs and inputs) to the database. err = p.cfg.ExportLog.LogPendingParcel( ctx, parcel, defaultWalletLeaseIdentifier, time.Now().Add(defaultBroadcastCoinLeaseDuration), diff --git a/tapfreighter/interface.go b/tapfreighter/interface.go index 09fb88405..b5f8ebf0c 100644 --- a/tapfreighter/interface.go +++ b/tapfreighter/interface.go @@ -252,6 +252,14 @@ type TransferOutput struct { // ShouldDeliverProof returns true if a proof corresponding to the subject // transfer output should be delivered to a peer. func (out *TransferOutput) ShouldDeliverProof() (bool, error) { + // If any proof delivery is already complete (some true), no further + // delivery is needed. However, if the proof delivery status is + // unset (none), we won't use that status in determining whether proof + // delivery is necessary. The field may not be set yet. + if out.ProofDeliveryComplete.UnwrapOr(false) { + return false, nil + } + // If the proof courier address is unspecified, we don't need to deliver // a proof. if len(out.ProofCourierAddr) == 0 { diff --git a/tapfreighter/parcel.go b/tapfreighter/parcel.go index fd11432bb..218be2c7a 100644 --- a/tapfreighter/parcel.go +++ b/tapfreighter/parcel.go @@ -604,7 +604,7 @@ func transferOutput(vPkt *tappsbt.VPacket, vOutIdx int, position uint64, return nil, fmt.Errorf("unable to create anchor: %w", err) } - return &TransferOutput{ + out := TransferOutput{ Anchor: *anchor, Type: vOut.Type, ScriptKey: vOut.ScriptKey, @@ -618,7 +618,28 @@ func transferOutput(vPkt *tappsbt.VPacket, vOutIdx int, position uint64, ProofCourierAddr: proofCourierAddrBytes, ScriptKeyLocal: isLocalKey(vOut.ScriptKey), Position: position, - }, nil + } + + // Determine whether an associated proof needs to be delivered to a peer + // based on the currently set fields. + shouldDeliverProof, err := out.ShouldDeliverProof() + if err != nil { + return nil, fmt.Errorf("unable to determine if transfer "+ + "output proof should be delivery to a peer: %w", err) + } + + if shouldDeliverProof { + // Set the `ProofDeliveryComplete` field to `Some(false)` to + // indicate that proof delivery is pending. Once the proof has + // been successfully delivered, this field will be updated to + // `Some(true)`. + // + // If it was determined that the proof should not be delivered, + // the `ProofDeliveryComplete` field would remain `None`. + out.ProofDeliveryComplete = fn.Some(false) + } + + return &out, nil } // outputAnchor creates an Anchor from an anchor transaction and a virtual diff --git a/tapgarden/custodian.go b/tapgarden/custodian.go index 2973fceb8..52df41cc4 100644 --- a/tapgarden/custodian.go +++ b/tapgarden/custodian.go @@ -605,7 +605,7 @@ func (c *Custodian) receiveProof(addr *address.Tap, op wire.OutPoint, err) } - log.Debugf("Received proof for: script_key=%x, asset_id=%x", + log.Debugf("Proof received (script_key=%x, asset_id=%x)", scriptKeyBytes, assetID[:]) ctx, cancel = c.CtxBlocking()