Skip to content

Commit 42b2629

Browse files
s7v7nislandsshekhirin
authored andcommitted
core: use atomic type (ethereum#27011)
1 parent 216427c commit 42b2629

File tree

4 files changed

+22
-20
lines changed

4 files changed

+22
-20
lines changed

core/blockchain.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ type BlockChain struct {
174174
triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
175175
gcproc time.Duration // Accumulates canonical block processing for trie dumping
176176
lastWrite uint64 // Last block when the state was flushed
177-
flushInterval int64 // Time interval (processing time) after which to flush a state
177+
flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state
178178
triedb *trie.Database // The database handler for maintaining trie nodes.
179179
stateCache state.Database // State database to reuse between imports (contains state cache)
180180

@@ -215,8 +215,8 @@ type BlockChain struct {
215215

216216
wg sync.WaitGroup //
217217
quit chan struct{} // shutdown signal, closed in Stop.
218-
running int32 // 0 if chain is running, 1 when stopped
219-
procInterrupt int32 // interrupt signaler for block processing
218+
stopping atomic.Bool // false if chain is running, true when stopped
219+
procInterrupt atomic.Bool // interrupt signaler for block processing
220220

221221
engine consensus.Engine
222222
validator Validator // Block and state validator interface
@@ -260,7 +260,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
260260
cacheConfig: cacheConfig,
261261
db: db,
262262
triedb: triedb,
263-
flushInterval: int64(cacheConfig.TrieTimeLimit),
264263
triegc: prque.New[int64, common.Hash](nil),
265264
quit: make(chan struct{}),
266265
chainmu: syncx.NewClosableMutex(),
@@ -273,6 +272,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
273272
engine: engine,
274273
vmConfig: vmConfig,
275274
}
275+
bc.flushInterval.Store(int64(cacheConfig.TrieTimeLimit))
276276
bc.forker = NewForkChoice(bc, shouldPreserve)
277277
bc.stateCache = state.NewDatabaseWithNodeDB(bc.db, bc.triedb)
278278
bc.validator = NewBlockValidator(chainConfig, bc, engine)
@@ -916,7 +916,7 @@ func (bc *BlockChain) writeHeadBlock(block *types.Block) {
916916
// This method has been exposed to allow tests to stop the blockchain while simulating
917917
// a crash.
918918
func (bc *BlockChain) stopWithoutSaving() {
919-
if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
919+
if !bc.stopping.CompareAndSwap(false, true) {
920920
return
921921
}
922922

@@ -998,12 +998,12 @@ func (bc *BlockChain) Stop() {
998998
// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after
999999
// calling this method.
10001000
func (bc *BlockChain) StopInsert() {
1001-
atomic.StoreInt32(&bc.procInterrupt, 1)
1001+
bc.procInterrupt.Store(true)
10021002
}
10031003

10041004
// insertStopped returns true after StopInsert has been called.
10051005
func (bc *BlockChain) insertStopped() bool {
1006-
return atomic.LoadInt32(&bc.procInterrupt) == 1
1006+
return bc.procInterrupt.Load()
10071007
}
10081008

10091009
func (bc *BlockChain) procFutureBlocks() {
@@ -1382,7 +1382,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
13821382
}
13831383
// Find the next state trie we need to commit
13841384
chosen := current - TriesInMemory
1385-
flushInterval := time.Duration(atomic.LoadInt64(&bc.flushInterval))
1385+
flushInterval := time.Duration(bc.flushInterval.Load())
13861386
// If we exceeded time allowance, flush an entire trie to disk
13871387
if bc.gcproc > flushInterval {
13881388
// If the header is missing (canonical chain behind), we're reorging a low
@@ -1735,7 +1735,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
17351735

17361736
// If we have a followup block, run that against the current state to pre-cache
17371737
// transactions and probabilistically some of the account/storage trie nodes.
1738-
var followupInterrupt uint32
1738+
var followupInterrupt atomic.Bool
17391739
if !bc.cacheConfig.TrieCleanNoPrefetch {
17401740
if followup, err := it.peek(); followup != nil && err == nil {
17411741
throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps)
@@ -1744,7 +1744,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
17441744
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
17451745

17461746
blockPrefetchExecuteTimer.Update(time.Since(start))
1747-
if atomic.LoadUint32(&followupInterrupt) == 1 {
1747+
if followupInterrupt.Load() {
17481748
blockPrefetchInterruptMeter.Mark(1)
17491749
}
17501750
}(time.Now(), followup, throwaway)
@@ -1756,15 +1756,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
17561756
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
17571757
if err != nil {
17581758
bc.reportBlock(block, receipts, err)
1759-
atomic.StoreUint32(&followupInterrupt, 1)
1759+
followupInterrupt.Store(true)
17601760
return it.index, err
17611761
}
17621762
ptime := time.Since(pstart)
17631763

17641764
vstart := time.Now()
17651765
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
17661766
bc.reportBlock(block, receipts, err)
1767-
atomic.StoreUint32(&followupInterrupt, 1)
1767+
followupInterrupt.Store(true)
17681768
return it.index, err
17691769
}
17701770
vtime := time.Since(vstart)
@@ -1797,7 +1797,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
17971797
} else {
17981798
status, err = bc.writeBlockAndSetHead(block, receipts, logs, statedb, false)
17991799
}
1800-
atomic.StoreUint32(&followupInterrupt, 1)
1800+
followupInterrupt.Store(true)
18011801
if err != nil {
18021802
return it.index, err
18031803
}
@@ -2497,5 +2497,5 @@ func (bc *BlockChain) SetBlockValidatorAndProcessorForTesting(v Validator, p Pro
24972497
// The interval is in terms of block processing time, not wall clock.
24982498
// It is thread-safe and can be called repeatedly without side effects.
24992499
func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) {
2500-
atomic.StoreInt64(&bc.flushInterval, int64(interval))
2500+
bc.flushInterval.Store(int64(interval))
25012501
}

core/chain_indexer.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ type ChainIndexer struct {
7575
backend ChainIndexerBackend // Background processor generating the index data content
7676
children []*ChainIndexer // Child indexers to cascade chain updates to
7777

78-
active uint32 // Flag whether the event loop was started
78+
active atomic.Bool // Flag whether the event loop was started
7979
update chan struct{} // Notification channel that headers should be processed
8080
quit chan chan error // Quit channel to tear down running goroutines
8181
ctx context.Context
@@ -166,7 +166,7 @@ func (c *ChainIndexer) Close() error {
166166
errs = append(errs, err)
167167
}
168168
// If needed, tear down the secondary event loop
169-
if atomic.LoadUint32(&c.active) != 0 {
169+
if c.active.Load() {
170170
c.quit <- errc
171171
if err := <-errc; err != nil {
172172
errs = append(errs, err)
@@ -196,7 +196,7 @@ func (c *ChainIndexer) Close() error {
196196
// queue.
197197
func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainHeadEvent, sub event.Subscription) {
198198
// Mark the chain indexer as active, requiring an additional teardown
199-
atomic.StoreUint32(&c.active, 1)
199+
c.active.Store(true)
200200

201201
defer sub.Unsubscribe()
202202

core/state_prefetcher.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func newStatePrefetcher(config *params.ChainConfig, bc *BlockChain, engine conse
4747
// Prefetch processes the state changes according to the Ethereum rules by running
4848
// the transaction messages using the statedb, but any changes are discarded. The
4949
// only goal is to pre-cache transaction signatures and state trie nodes.
50-
func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) {
50+
func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *atomic.Bool) {
5151
var (
5252
header = block.Header()
5353
gaspool = new(GasPool).AddGas(block.GasLimit())
@@ -59,7 +59,7 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
5959
byzantium := p.config.IsByzantium(block.Number())
6060
for i, tx := range block.Transactions() {
6161
// If block precaching was interrupted, abort
62-
if interrupt != nil && atomic.LoadUint32(interrupt) == 1 {
62+
if interrupt != nil && interrupt.Load() {
6363
return
6464
}
6565
// Convert the transaction into an executable message and pre-cache its sender

core/types.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package core
1818

1919
import (
20+
"sync/atomic"
21+
2022
"github.com/ethereum/go-ethereum/core/state"
2123
"github.com/ethereum/go-ethereum/core/types"
2224
"github.com/ethereum/go-ethereum/core/vm"
@@ -39,7 +41,7 @@ type Prefetcher interface {
3941
// Prefetch processes the state changes according to the Ethereum rules by running
4042
// the transaction messages using the statedb, but any changes are discarded. The
4143
// only goal is to pre-cache transaction signatures and state trie nodes.
42-
Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32)
44+
Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *atomic.Bool)
4345
}
4446

4547
// Processor is an interface for processing blocks using a given initial state.

0 commit comments

Comments
 (0)