Skip to content
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
docs(vmsync): streamline method docs
  • Loading branch information
powerslider committed Dec 3, 2025
commit 2e6e03396b3704aeedf72dd6b0e568071bddec82
32 changes: 7 additions & 25 deletions graft/coreth/plugin/evm/vmsync/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,16 @@ func (co *Coordinator) Start(ctx context.Context, initial message.Syncable) {
}()
}

// ProcessQueuedBlockOperations finalizes the VM at the current target and processes the
// queued operations in FIFO order. Intended to be called after a target update
// cycle when it's time to process the queued operations.
// ProcessQueuedBlockOperations finalizes the VM and processes queued block operations
// in FIFO order. Called after syncers complete to finalize state and execute deferred operations.
func (co *Coordinator) ProcessQueuedBlockOperations(ctx context.Context) error {
// Check for cancellation before starting finalization phase.
if err := ctx.Err(); err != nil {
return err
}

co.state.Store(int32(StateFinalizing))

if co.callbacks.FinalizeVM != nil {
// Check context again before expensive FinalizeVM operation.
if err := ctx.Err(); err != nil {
co.state.Store(int32(StateAborted))
return err
Expand All @@ -141,51 +138,37 @@ func (co *Coordinator) ProcessQueuedBlockOperations(ctx context.Context) error {
co.state.Store(int32(StateAborted))
return errInvalidTargetType
}
// FinalizeVM should complete atomically. The context is passed for internal
// cancellation checks, but the coordinator expects completion or an error.
if err := co.callbacks.FinalizeVM(ctx, current); err != nil {
co.state.Store(int32(StateAborted))
return err
}
}

// Check context before transitioning to batch execution state.
if err := ctx.Err(); err != nil {
co.state.Store(int32(StateAborted))
return err
}

co.state.Store(int32(StateExecutingBatch))

// Execute queued block operations sequentially. Each operation can be
// cancelled individually, but the batch execution itself is not atomic - partial completion
// is acceptable as operations are idempotent.
if err := co.executeBlockOperationBatch(ctx); err != nil {
// State will be set to StateAborted by finish() when error is returned.
return err
}

return nil
}

// UpdateSyncTarget broadcasts a new target to all updatable syncers.
// It is only valid in the [StateRunning] state. It cannot be called during
// batch execution to prevent removing blocks that are currently being processed.
// Note: no batch execution occurs here. Batches are only executed after
// finalization.
// Note: Syncers manage cancellation themselves through their Sync() contexts.
// UpdateSyncTarget broadcasts a new target to all syncers and removes stale blocks from queue.
// Only valid in [StateRunning] state. Syncers manage cancellation themselves.
func (co *Coordinator) UpdateSyncTarget(newTarget message.Syncable) error {
// Re-check state after each operation to handle concurrent state transitions.
if co.CurrentState() != StateRunning {
return errInvalidState
}
// Respect pivot policy if configured.
if !co.pivot.shouldForward(newTarget.Height()) {
return nil
}

// Re-check state before modifying queue to ensure we're still in Running state
// and not in batch execution (which would cause a race with removeBelowHeight).
// Re-check state before modifying queue to handle concurrent transitions.
if co.CurrentState() != StateRunning {
return errInvalidState
}
Expand Down Expand Up @@ -221,9 +204,8 @@ func (co *Coordinator) CurrentState() State {
return State(co.state.Load())
}

// executeBlockOperationBatch executes all queued block operations in FIFO order.
// Each operation can be cancelled individually, but the batch execution itself
// is not atomic - partial completion is acceptable as operations are idempotent.
// executeBlockOperationBatch executes queued block operations in FIFO order.
// Partial completion is acceptable as operations are idempotent.
func (co *Coordinator) executeBlockOperationBatch(ctx context.Context) error {
operations := co.queue.dequeueBatch()
for i, op := range operations {
Expand Down