Skip to content
2 changes: 2 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ var (
utils.AllowUnprotectedTxs,
utils.BatchRequestLimit,
utils.BatchResponseMaxSize,
utils.RPCTxSyncDefaultFlag,
utils.RPCTxSyncMaxFlag,
}

metricsFlags = []cli.Flag{
Expand Down
18 changes: 18 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,18 @@ var (
Value: ethconfig.Defaults.LogQueryLimit,
Category: flags.APICategory,
}
RPCTxSyncDefaultFlag = &cli.DurationFlag{
Name: "rpc.txsync.default",
Usage: "Default timeout for eth_sendRawTransactionSync (e.g. 2s, 500ms)",
Value: ethconfig.Defaults.TxSyncDefaultTimeout,
Category: flags.APICategory,
}
RPCTxSyncMaxFlag = &cli.DurationFlag{
Name: "rpc.txsync.max",
Usage: "Maximum allowed timeout for eth_sendRawTransactionSync (e.g. 5m)",
Value: ethconfig.Defaults.TxSyncMaxTimeout,
Category: flags.APICategory,
}
// Authenticated RPC HTTP settings
AuthListenFlag = &cli.StringFlag{
Name: "authrpc.addr",
Expand Down Expand Up @@ -1717,6 +1729,12 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.IsSet(RPCGlobalLogQueryLimit.Name) {
cfg.LogQueryLimit = ctx.Int(RPCGlobalLogQueryLimit.Name)
}
if ctx.IsSet(RPCTxSyncDefaultFlag.Name) {
cfg.TxSyncDefaultTimeout = ctx.Duration(RPCTxSyncDefaultFlag.Name)
}
if ctx.IsSet(RPCTxSyncMaxFlag.Name) {
cfg.TxSyncMaxTimeout = ctx.Duration(RPCTxSyncMaxFlag.Name)
}
if !ctx.Bool(SnapshotFlag.Name) || cfg.SnapshotCache == 0 {
// If snap-sync is requested, this flag is also required
if cfg.SyncMode == ethconfig.SnapSync {
Expand Down
37 changes: 37 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ import (
"github.com/ethereum/go-ethereum/core/txpool/locals"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
)
Expand Down Expand Up @@ -486,3 +488,38 @@ func (b *EthAPIBackend) StateAtBlock(ctx context.Context, block *types.Block, re
func (b *EthAPIBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (*types.Transaction, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) {
return b.eth.stateAtTransaction(ctx, block, txIndex, reexec)
}

func (b *EthAPIBackend) RPCTxSyncDefaultTimeout() time.Duration {
return b.eth.config.TxSyncDefaultTimeout
}

func (b *EthAPIBackend) RPCTxSyncMaxTimeout() time.Duration {
return b.eth.config.TxSyncMaxTimeout
}

func (b *EthAPIBackend) SubscribeTransactionReceipts(
txHashes []common.Hash,
out chan<- []*ethapi.ReceiptWithTx,
) event.Subscription {
ch := make(chan core.ChainEvent, 16)
sub := b.eth.blockchain.SubscribeChainEvent(ch)

go func() {
defer sub.Unsubscribe()
for {
select {
case ev, ok := <-ch:
if !ok {
return
}
batch := filters.FilterReceipts(txHashes, ev)
if len(batch) > 0 {
out <- batch
}
case <-sub.Err():
return
}
}
}()
return sub
}
48 changes: 27 additions & 21 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,29 @@ var FullNodeGPO = gasprice.Config{

// Defaults contains default settings for use on the Ethereum main net.
var Defaults = Config{
HistoryMode: history.KeepAll,
SyncMode: SnapSync,
NetworkId: 0, // enable auto configuration of networkID == chainID
TxLookupLimit: 2350000,
TransactionHistory: 2350000,
LogHistory: 2350000,
StateHistory: params.FullImmutabilityThreshold,
DatabaseCache: 512,
TrieCleanCache: 154,
TrieDirtyCache: 256,
TrieTimeout: 60 * time.Minute,
SnapshotCache: 102,
FilterLogCacheSize: 32,
LogQueryLimit: 1000,
Miner: miner.DefaultConfig,
TxPool: legacypool.DefaultConfig,
BlobPool: blobpool.DefaultConfig,
RPCGasCap: 50000000,
RPCEVMTimeout: 5 * time.Second,
GPO: FullNodeGPO,
RPCTxFeeCap: 1, // 1 ether
HistoryMode: history.KeepAll,
SyncMode: SnapSync,
NetworkId: 0, // enable auto configuration of networkID == chainID
TxLookupLimit: 2350000,
TransactionHistory: 2350000,
LogHistory: 2350000,
StateHistory: params.FullImmutabilityThreshold,
DatabaseCache: 512,
TrieCleanCache: 154,
TrieDirtyCache: 256,
TrieTimeout: 60 * time.Minute,
SnapshotCache: 102,
FilterLogCacheSize: 32,
LogQueryLimit: 1000,
Miner: miner.DefaultConfig,
TxPool: legacypool.DefaultConfig,
BlobPool: blobpool.DefaultConfig,
RPCGasCap: 50000000,
RPCEVMTimeout: 5 * time.Second,
GPO: FullNodeGPO,
RPCTxFeeCap: 1, // 1 ether
TxSyncDefaultTimeout: 20 * time.Second,
TxSyncMaxTimeout: 1 * time.Minute,
}

//go:generate go run github.com/fjl/gencodec -type Config -formats toml -out gen_config.go
Expand Down Expand Up @@ -183,6 +185,10 @@ type Config struct {

// OverrideVerkle (TODO: remove after the fork)
OverrideVerkle *uint64 `toml:",omitempty"`

// EIP-7966: eth_sendRawTransactionSync timeouts
TxSyncDefaultTimeout time.Duration `toml:",omitempty"`
TxSyncMaxTimeout time.Duration `toml:",omitempty"`
}

// CreateConsensusEngine creates a consensus engine for the given chain config.
Expand Down
10 changes: 4 additions & 6 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/history"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)
Expand Down Expand Up @@ -554,16 +555,13 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo
}

// ReceiptWithTx contains a receipt and its corresponding transaction
type ReceiptWithTx struct {
Receipt *types.Receipt
Transaction *types.Transaction
}
type ReceiptWithTx = ethapi.ReceiptWithTx
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to avoid circular dependency


// filterReceipts returns the receipts matching the given criteria
// FilterReceipts returns the receipts matching the given criteria
// In addition to returning receipts, it also returns the corresponding transactions.
// This is because receipts only contain low-level data, while user-facing data
// may require additional information from the Transaction.
func filterReceipts(txHashes []common.Hash, ev core.ChainEvent) []*ReceiptWithTx {
func FilterReceipts(txHashes []common.Hash, ev core.ChainEvent) []*ReceiptWithTx {
var ret []*ReceiptWithTx

receipts := ev.Receipts
Expand Down
2 changes: 1 addition & 1 deletion eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)

// Handle transaction receipts subscriptions when a new block is added
for _, f := range filters[TransactionReceiptsSubscription] {
matchedReceipts := filterReceipts(f.txHashes, ev)
matchedReceipts := FilterReceipts(f.txHashes, ev)
if len(matchedReceipts) > 0 {
f.receipts <- matchedReceipts
}
Expand Down
38 changes: 38 additions & 0 deletions ethclient/ethclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package ethclient

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -696,6 +698,42 @@ func (ec *Client) SendTransaction(ctx context.Context, tx *types.Transaction) er
return ec.c.CallContext(ctx, nil, "eth_sendRawTransaction", hexutil.Encode(data))
}

// SendRawTransactionSync submits a signed tx and waits for a receipt (or until
// the optional timeout elapses on the server side). If timeout == 0, the server
// uses its default.
func (ec *Client) SendRawTransactionSync(
ctx context.Context,
tx *types.Transaction,
timeout time.Duration,
) (*types.Receipt, error) {
var buf bytes.Buffer
if err := tx.EncodeRLP(&buf); err != nil {
return nil, err
}
return ec.SendRawTransactionSyncRaw(ctx, buf.Bytes(), timeout)
}

// SendRawTransactionSyncRaw is the low-level variant that takes the raw RLP.
func (ec *Client) SendRawTransactionSyncRaw(
ctx context.Context,
rawTx []byte,
timeout time.Duration,
) (*types.Receipt, error) {
var out *types.Receipt

// Build params: raw bytes as hex, plus optional timeout as hexutil.Uint64
params := []any{hexutil.Bytes(rawTx)}
if timeout > 0 {
t := hexutil.Uint64(timeout.Milliseconds())
params = append(params, t)
}

if err := ec.c.CallContext(ctx, &out, "eth_sendRawTransactionSync", params...); err != nil {
return nil, err
}
return out, nil
}

// RevertErrorData returns the 'revert reason' data of a contract call.
//
// This can be used with CallContract and EstimateGas, and only when the server is Geth.
Expand Down
87 changes: 87 additions & 0 deletions internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,93 @@ func (api *TransactionAPI) SendRawTransaction(ctx context.Context, input hexutil
return SubmitTransaction(ctx, api.b, tx)
}

type ReceiptWithTx struct {
Receipt *types.Receipt
Transaction *types.Transaction
}

// SendRawTransactionSync will add the signed transaction to the transaction pool
// and wait until the transaction has been included in a block and return the receipt, or the timeout.
func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hexutil.Bytes, timeoutMs *hexutil.Uint64) (map[string]interface{}, error) {
tx := new(types.Transaction)
if err := tx.UnmarshalBinary(input); err != nil {
return nil, err
}
hash, err := SubmitTransaction(ctx, api.b, tx)
if err != nil {
return nil, err
}

maxTimeout := api.b.RPCTxSyncMaxTimeout()
defaultTimeout := api.b.RPCTxSyncDefaultTimeout()

timeout := defaultTimeout
if timeoutMs != nil && *timeoutMs > 0 {
req := time.Duration(*timeoutMs) * time.Millisecond
if req > maxTimeout {
timeout = maxTimeout
} else {
timeout = req
}
}

receiptCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

// Fast path.
if r, err := api.GetTransactionReceipt(receiptCtx, hash); err == nil && r != nil {
return r, nil
}

// Subscribe to receipt stream (filtered to this tx)
receipts := make(chan []*ReceiptWithTx, 1)
sub := api.b.SubscribeTransactionReceipts([]common.Hash{hash}, receipts)
defer sub.Unsubscribe()

subErrCh := sub.Err()

for {
select {
case <-receiptCtx.Done():
// Upstream cancellation -> bubble it; otherwise emit our timeout error
if err := ctx.Err(); err != nil {
return nil, err
}
return nil, &txSyncTimeoutError{
msg: fmt.Sprintf("The transaction was added to the transaction pool but wasn't processed in %v.", timeout),
hash: hash,
}

case err, ok := <-subErrCh:
if !ok || err == nil {
// subscription closed; disable this case
subErrCh = nil
continue
}
return nil, err

case batch := <-receipts:
for _, rwt := range batch {
if rwt == nil || rwt.Receipt == nil || rwt.Receipt.TxHash != hash {
continue
}

if rwt.Receipt.BlockNumber != nil && rwt.Receipt.BlockHash != (common.Hash{}) {
return MarshalReceipt(
rwt.Receipt,
rwt.Receipt.BlockHash,
rwt.Receipt.BlockNumber.Uint64(),
api.signer,
rwt.Transaction,
int(rwt.Receipt.TransactionIndex),
), nil
}
return api.GetTransactionReceipt(receiptCtx, hash)
}
}
}
}

// Sign calculates an ECDSA signature for:
// keccak256("\x19Ethereum Signed Message:\n" + len(message) + message).
//
Expand Down
Loading