Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 135 additions & 2 deletions internal/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,19 @@ func (rpc *Client) GetFullBlocks(ctx context.Context, blockNumbers []*big.Int) [
var receipts []RPCFetchBatchResult[*big.Int, common.RawReceipts]
wg.Add(2)

// Check if we need special handling for chain ID 296
needsSpecialHandling := rpc.needsChain296SpecialHandling(blockNumbers)

go func() {
defer wg.Done()
result := RPCFetchSingleBatch[*big.Int, common.RawBlock](rpc, ctx, blockNumbers, "eth_getBlockByNumber", GetBlockWithTransactionsParams)
blocks = result
if needsSpecialHandling {
// For chain 296 with block <= 5780915, get blocks without transactions first
// then fetch transactions separately in batches
blocks = RPCFetchSingleBatch[*big.Int, common.RawBlock](rpc, ctx, blockNumbers, "eth_getBlockByNumber", GetBlockWithoutTransactionsParams)
} else {
// Normal flow - get blocks with transactions
blocks = RPCFetchSingleBatch[*big.Int, common.RawBlock](rpc, ctx, blockNumbers, "eth_getBlockByNumber", GetBlockWithTransactionsParams)
}
}()

if rpc.supportsBlockReceipts {
Expand Down Expand Up @@ -267,6 +276,11 @@ func (rpc *Client) GetFullBlocks(ctx context.Context, blockNumbers []*big.Int) [

wg.Wait()

// If we used special handling, we need to fetch transactions separately
if needsSpecialHandling {
blocks = rpc.fetchTransactionsForChain296(ctx, blocks)
}

return SerializeFullBlocks(rpc.chainID, blocks, logs, traces, receipts)
}

Expand Down Expand Up @@ -315,3 +329,122 @@ func (rpc *Client) HasCode(ctx context.Context, address string) (bool, error) {
}
return len(code) > 0, nil
}

// needsChain296SpecialHandling checks if we need special handling for chain ID 296
func (rpc *Client) needsChain296SpecialHandling(blockNumbers []*big.Int) bool {
// Check if chain ID is 296
if rpc.chainID == nil || rpc.chainID.Uint64() != 296 {
return false
}

// Check if any block number is <= 5780915 and >= 3853944
threshold2 := big.NewInt(5780915)
threshold1 := big.NewInt(3853944)
for _, blockNum := range blockNumbers {
if blockNum.Cmp(threshold1) > 0 && blockNum.Cmp(threshold2) <= 0 {
return true
}
}
return false
}

// fetchTransactionsForChain296 fetches transactions separately for chain 296 blocks
func (rpc *Client) fetchTransactionsForChain296(ctx context.Context, blocks []RPCFetchBatchResult[*big.Int, common.RawBlock]) []RPCFetchBatchResult[*big.Int, common.RawBlock] {
// Collect all transaction hashes from all blocks
var allTxHashes []string
blockTxMap := make(map[string][]string) // maps block number to transaction hashes

for _, blockResult := range blocks {
if blockResult.Error != nil {
continue
}
blockNum := blockResult.Key.String()
var txHashes []string

// Extract transaction hashes from the block
if transactions, exists := blockResult.Result["transactions"]; exists {
if txList, ok := transactions.([]interface{}); ok {
for _, tx := range txList {
if txHash, ok := tx.(string); ok {
txHashes = append(txHashes, txHash)
allTxHashes = append(allTxHashes, txHash)
}
}
}
}
blockTxMap[blockNum] = txHashes
}

if len(allTxHashes) == 0 {
return blocks
}

// Fetch transactions in batches of 100 with parallel processing
transactions := rpc.fetchTransactionsInBatches(ctx, allTxHashes, 100)

// Create a map of transaction hash to transaction data
txMap := make(map[string]interface{})
for _, txResult := range transactions {
if txResult.Error == nil {
txMap[txResult.Key] = txResult.Result
}
}

// Update blocks with transaction data
for i, blockResult := range blocks {
if blockResult.Error != nil {
continue
}
blockNum := blockResult.Key.String()
txHashes := blockTxMap[blockNum]

var blockTransactions []interface{}
for _, txHash := range txHashes {
if txData, exists := txMap[txHash]; exists {
blockTransactions = append(blockTransactions, txData)
}
}

// Update the block with the fetched transactions
blocks[i].Result["transactions"] = blockTransactions
}

return blocks
}

// fetchTransactionsInBatches fetches transactions in batches with parallel processing
func (rpc *Client) fetchTransactionsInBatches(ctx context.Context, txHashes []string, batchSize int) []RPCFetchBatchResult[string, common.RawTransaction] {
if len(txHashes) <= batchSize {
return RPCFetchSingleBatch[string, common.RawTransaction](rpc, ctx, txHashes, "eth_getTransactionByHash", GetTransactionParams)
}

// Split into chunks
chunks := common.SliceToChunks[string](txHashes, batchSize)

log.Debug().Msgf("Fetching %d transactions in %d chunks of max %d requests", len(txHashes), len(chunks), batchSize)

var wg sync.WaitGroup
resultsCh := make(chan []RPCFetchBatchResult[string, common.RawTransaction], len(chunks))

// Process chunks in parallel
for _, chunk := range chunks {
wg.Add(1)
go func(chunk []string) {
defer wg.Done()
resultsCh <- RPCFetchSingleBatch[string, common.RawTransaction](rpc, ctx, chunk, "eth_getTransactionByHash", GetTransactionParams)
}(chunk)
}

go func() {
wg.Wait()
close(resultsCh)
}()

// Collect results
results := make([]RPCFetchBatchResult[string, common.RawTransaction], 0, len(txHashes))
for batchResults := range resultsCh {
results = append(results, batchResults...)
}

return results
}
Loading