Skip to content

Commit fe8232a

Browse files
Thegaramcolinlyguo
andauthored
fix(worker): short-circuit l1 message iteration (#525)
* fix(worker): short-circuit l1 message iteration * Update core/rawdb/accessors_l1_message_test.go Co-authored-by: colin <[email protected]> --------- Co-authored-by: colin <[email protected]>
1 parent abcf48f commit fe8232a

File tree

4 files changed

+80
-7
lines changed

4 files changed

+80
-7
lines changed

core/rawdb/accessors_l1_message.go

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,38 @@ func ReadSyncedL1BlockNumber(db ethdb.Reader) *uint64 {
4343
return &value
4444
}
4545

46+
// WriteHighestSyncedQueueIndex writes the highest synced L1 message queue index to the database.
47+
func WriteHighestSyncedQueueIndex(db ethdb.KeyValueWriter, queueIndex uint64) {
48+
value := big.NewInt(0).SetUint64(queueIndex).Bytes()
49+
50+
if err := db.Put(highestSyncedQueueIndexKey, value); err != nil {
51+
log.Crit("Failed to update highest synced L1 message queue index", "err", err)
52+
}
53+
}
54+
55+
// ReadHighestSyncedQueueIndex retrieves the highest synced L1 message queue index.
56+
func ReadHighestSyncedQueueIndex(db ethdb.Reader) uint64 {
57+
data, err := db.Get(highestSyncedQueueIndexKey)
58+
if err != nil && isNotFoundErr(err) {
59+
return 0
60+
}
61+
if err != nil {
62+
log.Crit("Failed to read highest synced L1 message queue index from database", "err", err)
63+
}
64+
if len(data) == 0 {
65+
return 0
66+
}
67+
68+
number := new(big.Int).SetBytes(data)
69+
if !number.IsUint64() {
70+
log.Crit("Unexpected highest synced L1 block number in database", "number", number)
71+
}
72+
73+
return number.Uint64()
74+
}
75+
4676
// WriteL1Message writes an L1 message to the database.
77+
// We assume that L1 messages are written to DB following their queue index order.
4778
func WriteL1Message(db ethdb.KeyValueWriter, l1Msg types.L1MessageTx) {
4879
bytes, err := rlp.EncodeToBytes(l1Msg)
4980
if err != nil {
@@ -52,6 +83,8 @@ func WriteL1Message(db ethdb.KeyValueWriter, l1Msg types.L1MessageTx) {
5283
if err := db.Put(L1MessageKey(l1Msg.QueueIndex), bytes); err != nil {
5384
log.Crit("Failed to store L1 message", "err", err)
5485
}
86+
87+
WriteHighestSyncedQueueIndex(db, l1Msg.QueueIndex)
5588
}
5689

5790
// WriteL1Messages writes an array of L1 messages to the database.
@@ -91,20 +124,23 @@ func ReadL1Message(db ethdb.Reader, queueIndex uint64) *types.L1MessageTx {
91124
// allows us to iterate over L1 messages in the database. It
92125
// implements an interface similar to ethdb.Iterator.
93126
type L1MessageIterator struct {
94-
inner ethdb.Iterator
95-
keyLength int
127+
inner ethdb.Iterator
128+
keyLength int
129+
maxQueueIndex uint64
96130
}
97131

98132
// IterateL1MessagesFrom creates an L1MessageIterator that iterates over
99133
// all L1 message in the database starting at the provided enqueue index.
100-
func IterateL1MessagesFrom(db ethdb.Iteratee, fromQueueIndex uint64) L1MessageIterator {
134+
func IterateL1MessagesFrom(db ethdb.Database, fromQueueIndex uint64) L1MessageIterator {
101135
start := encodeBigEndian(fromQueueIndex)
102136
it := db.NewIterator(l1MessagePrefix, start)
103137
keyLength := len(l1MessagePrefix) + 8
138+
maxQueueIndex := ReadHighestSyncedQueueIndex(db)
104139

105140
return L1MessageIterator{
106-
inner: it,
107-
keyLength: keyLength,
141+
inner: it,
142+
keyLength: keyLength,
143+
maxQueueIndex: maxQueueIndex,
108144
}
109145
}
110146

@@ -145,7 +181,7 @@ func (it *L1MessageIterator) Release() {
145181
}
146182

147183
// ReadL1MessagesFrom retrieves up to `maxCount` L1 messages starting at `startIndex`.
148-
func ReadL1MessagesFrom(db ethdb.Iteratee, startIndex, maxCount uint64) []types.L1MessageTx {
184+
func ReadL1MessagesFrom(db ethdb.Database, startIndex, maxCount uint64) []types.L1MessageTx {
149185
msgs := make([]types.L1MessageTx, 0, maxCount)
150186
it := IterateL1MessagesFrom(db, startIndex)
151187
defer it.Release()
@@ -170,6 +206,10 @@ func ReadL1MessagesFrom(db ethdb.Iteratee, startIndex, maxCount uint64) []types.
170206
msgs = append(msgs, msg)
171207
index += 1
172208
count -= 1
209+
210+
if msg.QueueIndex == it.maxQueueIndex {
211+
break
212+
}
173213
}
174214

175215
return msgs

core/rawdb/accessors_l1_message_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ func TestReadWriteL1Message(t *testing.T) {
4848
if got == nil || got.QueueIndex != queueIndex {
4949
t.Fatal("L1 message mismatch", "expected", queueIndex, "got", got)
5050
}
51+
52+
max := ReadHighestSyncedQueueIndex(db)
53+
if max != 123 {
54+
t.Fatal("max index mismatch", "expected", 123, "got", max)
55+
}
5156
}
5257

5358
func TestIterateL1Message(t *testing.T) {
@@ -62,6 +67,11 @@ func TestIterateL1Message(t *testing.T) {
6267
db := NewMemoryDatabase()
6368
WriteL1Messages(db, msgs)
6469

70+
max := ReadHighestSyncedQueueIndex(db)
71+
if max != 1000 {
72+
t.Fatal("max index mismatch", "expected", 1000, "got", max)
73+
}
74+
6575
it := IterateL1MessagesFrom(db, 103)
6676
defer it.Release()
6777

@@ -125,3 +135,25 @@ func TestReadWriteLastL1MessageInL2Block(t *testing.T) {
125135
}
126136
}
127137
}
138+
139+
func TestIterationStopsAtMaxQueueIndex(t *testing.T) {
140+
msgs := []types.L1MessageTx{
141+
newL1MessageTx(100),
142+
newL1MessageTx(101),
143+
newL1MessageTx(102),
144+
newL1MessageTx(103),
145+
}
146+
147+
db := NewMemoryDatabase()
148+
WriteL1Messages(db, msgs)
149+
150+
// artificially change max index from 103 to 102
151+
WriteHighestSyncedQueueIndex(db, 102)
152+
153+
// iteration should terminate at 102 and not read 103
154+
got := ReadL1MessagesFrom(db, 100, 10)
155+
156+
if len(got) != 3 {
157+
t.Fatal("Invalid length", "expected", 3, "got", len(got))
158+
}
159+
}

core/rawdb/schema.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ var (
107107
syncedL1BlockNumberKey = []byte("LastSyncedL1BlockNumber")
108108
l1MessagePrefix = []byte("l1") // l1MessagePrefix + queueIndex (uint64 big endian) -> L1MessageTx
109109
firstQueueIndexNotInL2BlockPrefix = []byte("q") // firstQueueIndexNotInL2BlockPrefix + L2 block hash -> enqueue index
110+
highestSyncedQueueIndexKey = []byte("HighestSyncedQueueIndex")
110111

111112
// Row consumption
112113
rowConsumptionPrefix = []byte("rc") // rowConsumptionPrefix + hash -> row consumption by block

params/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
const (
2525
VersionMajor = 4 // Major version component of the current release
2626
VersionMinor = 4 // Minor version component of the current release
27-
VersionPatch = 13 // Patch version component of the current release
27+
VersionPatch = 14 // Patch version component of the current release
2828
VersionMeta = "sepolia" // Version metadata to append to the version string
2929
)
3030

0 commit comments

Comments
 (0)