Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
test(vmsync): add unit tests for dynamic state sync flow
  • Loading branch information
powerslider committed Dec 3, 2025
commit 30853a45dfb9dbe44d72fe3cf9c2f3adcf2f925f
76 changes: 76 additions & 0 deletions graft/coreth/plugin/evm/vmsync/block_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package vmsync

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestBlockQueue_EnqueueAndDequeue(t *testing.T) {
q := newBlockQueue()

// Nil block should be rejected.
require.False(t, q.enqueue(nil, OpAccept))

// Enqueue blocks.
for i := uint64(100); i < 105; i++ {
require.True(t, q.enqueue(newMockBlock(i), OpAccept))
}

// Dequeue returns all in FIFO order and clears queue.
batch := q.dequeueBatch()
require.Len(t, batch, 5)
for i, op := range batch {
require.Equal(t, uint64(100+i), op.block.GetEthBlock().NumberU64())
}

// Queue is now empty.
require.Empty(t, q.dequeueBatch())
}

func TestBlockQueue_RemoveBelowHeight(t *testing.T) {
q := newBlockQueue()

// Enqueue blocks at heights 100-110.
for i := uint64(100); i <= 110; i++ {
q.enqueue(newMockBlock(i), OpAccept)
}

// Remove blocks at or below height 105.
q.removeBelowHeight(105)

// Only blocks > 105 should remain (106, 107, 108, 109, 110).
batch := q.dequeueBatch()
require.Len(t, batch, 5)
require.Equal(t, uint64(106), batch[0].block.GetEthBlock().NumberU64())
}

func TestBlockQueue_ConcurrentAccess(t *testing.T) {
t.Parallel()

q := newBlockQueue()
const numGoroutines = 10
const numOps = 100

var wg sync.WaitGroup
wg.Add(numGoroutines)

for g := 0; g < numGoroutines; g++ {
go func(id int) {
defer wg.Done()
for i := 0; i < numOps; i++ {
q.enqueue(newMockBlock(uint64(id*numOps+i)), OpAccept)
}
}(g)
}

wg.Wait()

// All operations should have been enqueued.
batch := q.dequeueBatch()
require.Len(t, batch, numGoroutines*numOps)
}
9 changes: 3 additions & 6 deletions graft/coreth/plugin/evm/vmsync/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,9 @@ type Coordinator struct {
// doneOnce ensures [Callbacks.OnDone] is invoked at most once.
doneOnce sync.Once

// pivot policy to throttle [Coordinator.UpdateSyncTarget] calls.
pivot *pivotPolicy

// pivotInterval configures the pivot policy throttling. 0 disables throttling.
pivotInterval uint64
pivot *pivotPolicy
}

// CoordinatorOption follows the functional options pattern for Coordinator.
Expand All @@ -79,10 +78,7 @@ func NewCoordinator(syncerRegistry *SyncerRegistry, cbs Callbacks, opts ...Coord
syncerRegistry: syncerRegistry,
callbacks: cbs,
}

options.ApplyTo(co, opts...)

co.pivot = newPivotPolicy(co.pivotInterval)
co.state.Store(int32(StateIdle))

return co
Expand All @@ -93,6 +89,7 @@ func NewCoordinator(syncerRegistry *SyncerRegistry, cbs Callbacks, opts ...Coord
func (co *Coordinator) Start(ctx context.Context, initial message.Syncable) {
co.state.Store(int32(StateInitializing))
co.target.Store(initial)
co.pivot = newPivotPolicy(co.pivotInterval)

cctx, cancel := context.WithCancelCause(ctx)
g := co.syncerRegistry.StartAsync(cctx, initial)
Expand Down
135 changes: 135 additions & 0 deletions graft/coreth/plugin/evm/vmsync/coordinator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package vmsync

import (
"context"
"errors"
"sync"
"testing"

"github.com/ava-labs/libevm/common"
"github.com/stretchr/testify/require"

"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message"
)

func TestCoordinator_StateValidation(t *testing.T) {
co := NewCoordinator(NewSyncerRegistry(), Callbacks{}, WithPivotInterval(1))
block := newMockBlock(100)
target := newTestSyncTarget(100)

// States that reject both operations.
for _, state := range []State{StateIdle, StateInitializing, StateFinalizing, StateCompleted, StateAborted} {
co.state.Store(int32(state))
require.False(t, co.AddBlockOperation(block, OpAccept), "state %d should reject block", state)
require.ErrorIs(t, co.UpdateSyncTarget(target), errInvalidState, "state %d should reject target update", state)
}

// Running: accepts both.
co.state.Store(int32(StateRunning))
require.True(t, co.AddBlockOperation(block, OpAccept))
require.NoError(t, co.UpdateSyncTarget(target))

// ExecutingBatch: accepts blocks, rejects target updates.
co.state.Store(int32(StateExecutingBatch))
require.True(t, co.AddBlockOperation(block, OpAccept))
require.ErrorIs(t, co.UpdateSyncTarget(target), errInvalidState)

// Nil block is always rejected.
co.state.Store(int32(StateRunning))
require.False(t, co.AddBlockOperation(nil, OpAccept))
}

func TestCoordinator_UpdateSyncTarget_RemovesStaleBlocks(t *testing.T) {
co := NewCoordinator(NewSyncerRegistry(), Callbacks{}, WithPivotInterval(1))
co.state.Store(int32(StateRunning))

for i := uint64(100); i <= 110; i++ {
co.AddBlockOperation(newMockBlock(i), OpAccept)
}

require.NoError(t, co.UpdateSyncTarget(newTestSyncTarget(105)))

batch := co.queue.dequeueBatch()
require.Len(t, batch, 5) // Only 106-110 remain.
}

func TestCoordinator_Lifecycle(t *testing.T) {
t.Run("completes successfully", func(t *testing.T) {
registry := NewSyncerRegistry()
require.NoError(t, registry.Register(newMockSyncer("test", nil)))

co, err := runCoordinator(t, registry, Callbacks{
FinalizeVM: func(context.Context, message.Syncable) error { return nil },
})

require.NoError(t, err)
require.Equal(t, StateCompleted, co.CurrentState())
})

t.Run("aborts on syncer error", func(t *testing.T) {
expectedErr := errors.New("syncer failed")
registry := NewSyncerRegistry()
require.NoError(t, registry.Register(newMockSyncer("failing", expectedErr)))

co, err := runCoordinator(t, registry, Callbacks{})

require.ErrorIs(t, err, expectedErr)
require.Equal(t, StateAborted, co.CurrentState())
})
}

func TestCoordinator_ProcessQueuedBlockOperations(t *testing.T) {
t.Run("executes queued operations", func(t *testing.T) {
co := NewCoordinator(NewSyncerRegistry(), Callbacks{})
co.state.Store(int32(StateRunning))
co.target.Store(newTestSyncTarget(100))
co.AddBlockOperation(newMockBlock(100), OpAccept)

require.NoError(t, co.ProcessQueuedBlockOperations(t.Context()))
require.Equal(t, StateExecutingBatch, co.CurrentState())
})

t.Run("returns error on block operation failure", func(t *testing.T) {
co := NewCoordinator(NewSyncerRegistry(), Callbacks{})
co.state.Store(int32(StateRunning))
co.target.Store(newTestSyncTarget(100))

failBlock := newMockBlock(100)
failBlock.acceptErr = errors.New("accept failed")
co.AddBlockOperation(failBlock, OpAccept)

err := co.ProcessQueuedBlockOperations(t.Context())
require.ErrorIs(t, err, errBatchOperationFailed)
})
}

// runCoordinator starts a coordinator and waits for completion.
func runCoordinator(t *testing.T, registry *SyncerRegistry, cbs Callbacks) (*Coordinator, error) {
t.Helper()

var (
errDone error
wg sync.WaitGroup
)
wg.Add(1)

cbs.OnDone = func(err error) {
errDone = err
wg.Done()
}

co := NewCoordinator(registry, cbs)
co.Start(t.Context(), newTestSyncTarget(100))
wg.Wait()

return co, errDone
}

func newTestSyncTarget(height uint64) message.Syncable {
hash := common.BytesToHash([]byte{byte(height)})
root := common.BytesToHash([]byte{byte(height + 1)})
return newSyncTarget(hash, root, height)
}
25 changes: 25 additions & 0 deletions graft/coreth/plugin/evm/vmsync/doubles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,39 @@ package vmsync
import (
"context"
"errors"
"math/big"
"sync"
"time"

"github.com/ava-labs/libevm/core/types"

"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message"

syncpkg "github.com/ava-labs/avalanchego/graft/coreth/sync"
)

// mockEthBlockWrapper implements EthBlockWrapper for testing.
type mockEthBlockWrapper struct {
ethBlock *types.Block
acceptErr error
rejectErr error
verifyErr error
}

func newMockBlock(height uint64) *mockEthBlockWrapper {
header := &types.Header{Number: new(big.Int).SetUint64(height)}
return &mockEthBlockWrapper{
ethBlock: types.NewBlockWithHeader(header),
}
}

func (m *mockEthBlockWrapper) GetEthBlock() *types.Block { return m.ethBlock }
func (m *mockEthBlockWrapper) Accept(context.Context) error { return m.acceptErr }
func (m *mockEthBlockWrapper) Reject(context.Context) error { return m.rejectErr }
func (m *mockEthBlockWrapper) Verify(context.Context) error { return m.verifyErr }

var _ EthBlockWrapper = (*mockEthBlockWrapper)(nil)

// FuncSyncer adapts a function to the simple Syncer shape used in tests. It is
// useful for defining small, behavior-driven syncers inline.
type FuncSyncer struct {
Expand Down
27 changes: 27 additions & 0 deletions graft/coreth/plugin/evm/vmsync/pivot_policy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package vmsync

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestPivotPolicy(t *testing.T) {
// Zero interval uses default.
require.Equal(t, defaultPivotInterval, newPivotPolicy(0).interval)

// Test throttling behavior.
p := newPivotPolicy(100)

// First call at 150 initializes threshold to ceil(150/100)*100 = 200
require.False(t, p.shouldForward(150)) // 150 < 200
require.False(t, p.shouldForward(199)) // 199 < 200
require.True(t, p.shouldForward(200)) // 200 >= 200
p.advance() // threshold becomes 300

require.False(t, p.shouldForward(250)) // 250 < 300
require.True(t, p.shouldForward(300)) // 300 >= 300
}