-
Notifications
You must be signed in to change notification settings - Fork 844
Expand file tree
/
Copy pathregistry.go
More file actions
131 lines (106 loc) · 4.18 KB
/
registry.go
File metadata and controls
131 lines (106 loc) · 4.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package vmsync
import (
"context"
"errors"
"fmt"
"github.com/ava-labs/libevm/log"
"golang.org/x/sync/errgroup"
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message"
syncpkg "github.com/ava-labs/avalanchego/graft/coreth/sync"
)
var errSyncerAlreadyRegistered = errors.New("syncer already registered")
// SyncerTask represents a single syncer with its name for identification.
type SyncerTask struct {
name string
syncer syncpkg.Syncer
}
// SyncerRegistry manages a collection of syncers for sequential execution.
type SyncerRegistry struct {
syncers []SyncerTask
registeredNames map[string]bool // Track registered IDs to prevent duplicates.
}
// NewSyncerRegistry creates a new empty syncer registry.
func NewSyncerRegistry() *SyncerRegistry {
return &SyncerRegistry{
registeredNames: make(map[string]bool),
}
}
// Register adds a syncer to the registry.
// Returns an error if a syncer with the same name is already registered.
func (r *SyncerRegistry) Register(syncer syncpkg.Syncer) error {
id := syncer.ID()
if r.registeredNames[id] {
return fmt.Errorf("%w with id '%s'", errSyncerAlreadyRegistered, id)
}
r.registeredNames[id] = true
r.syncers = append(r.syncers, SyncerTask{syncer.Name(), syncer})
return nil
}
// RunSyncerTasks executes all registered syncers synchronously.
func (r *SyncerRegistry) RunSyncerTasks(ctx context.Context, summary message.Syncable) error {
// Early return if context is already canceled (e.g., during shutdown).
if err := ctx.Err(); err != nil {
return err
}
g := r.StartAsync(ctx, summary)
if err := g.Wait(); err != nil {
return err
}
log.Info("all syncers completed successfully", "count", len(r.syncers), "summary", summary.GetBlockHash().Hex())
return nil
}
// StartAsync launches all registered syncers and returns an [errgroup.Group]
// whose Wait() completes when all syncers exit. The context returned will be
// cancelled when any syncer fails, propagating shutdown to the others.
func (r *SyncerRegistry) StartAsync(ctx context.Context, summary message.Syncable) *errgroup.Group {
g, egCtx := errgroup.WithContext(ctx)
if len(r.syncers) == 0 {
return g
}
summaryBlockHashHex := summary.GetBlockHash().Hex()
blockHeight := summary.Height()
for _, task := range r.syncers {
g.Go(func() error {
log.Info("starting syncer", "name", task.name, "summary", summaryBlockHashHex, "height", blockHeight)
if err := task.syncer.Sync(egCtx); err != nil {
// Context cancellation during shutdown is expected.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
log.Info("syncer cancelled", "name", task.name, "summary", summaryBlockHashHex, "height", blockHeight)
return err
}
log.Error("failed syncing", "name", task.name, "summary", summaryBlockHashHex, "height", blockHeight, "err", err)
return fmt.Errorf("%s failed: %w", task.name, err)
}
log.Info("completed successfully", "name", task.name, "summary", summaryBlockHashHex, "height", blockHeight)
return nil
})
}
return g
}
// UpdateSyncTarget updates the sync target for all syncers.
// Note: Syncers manage cancellation themselves through their Sync() contexts.
func (r *SyncerRegistry) UpdateSyncTarget(newTarget message.Syncable) error {
for _, task := range r.syncers {
if err := task.syncer.UpdateTarget(newTarget); err != nil {
log.Error("failed updating sync target", "name", task.name, "err", err)
return err
}
log.Info("updated sync target", "name", task.name, "new_target", newTarget.GetBlockHash().Hex(), "height", newTarget.Height())
}
return nil
}
// FinalizeAll calls Finalize on all registered syncers.
// This should be called after all syncers have completed their Sync() operations
// and before finalizing the VM state.
func (r *SyncerRegistry) FinalizeAll(ctx context.Context) error {
for _, task := range r.syncers {
if err := task.syncer.Finalize(ctx); err != nil {
log.Error("failed finalizing syncer", "name", task.name, "err", err)
return fmt.Errorf("%s finalize failed: %w", task.name, err)
}
log.Info("finalized syncer", "name", task.name)
}
return nil
}