Skip to content
Merged
23 changes: 21 additions & 2 deletions beacon_chain/eth2_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -536,10 +536,21 @@ proc runQueueProcessingLoop*(self: ref Eth2Processor) {.async.} =
elif aggregateFut.finished:
# aggregates will be dropped under heavy load on producer side
self[].processAggregate(aggregateFut.read())
for i in 0..<7: # process a few at a time - this is fairly fast
if self[].aggregatesQueue.empty():
break
self[].processAggregate(self[].aggregatesQueue.popFirstNoWait())

aggregateFut = self[].aggregatesQueue.popFirst()
elif attestationFut.finished:
# attestations will be dropped under heavy load on producer side
self[].processAttestation(attestationFut.read())

for i in 0..<7: # process a few at a time - this is fairly fast
if self[].attestationsQueue.empty():
break
self[].processAttestation(self[].attestationsQueue.popFirstNoWait())

attestationFut = self[].attestationsQueue.popFirst()

proc new*(T: type Eth2Processor,
Expand All @@ -560,6 +571,14 @@ proc new*(T: type Eth2Processor,
quarantine: quarantine,
blockReceivedDuringSlot: newFuture[void](),
blocksQueue: newAsyncQueue[BlockEntry](1),
aggregatesQueue: newAsyncQueue[AggregateEntry](MAX_ATTESTATIONS.int),
attestationsQueue: newAsyncQueue[AttestationEntry](TARGET_COMMITTEE_SIZE.int * 4),
# limit to the max number of aggregates we expect to see in one slot
aggregatesQueue: newAsyncQueue[AggregateEntry](
(TARGET_AGGREGATORS_PER_COMMITTEE * MAX_COMMITTEES_PER_SLOT).int),
# This queue is a bit harder to bound reasonably - we want to get a good
# spread of votes across committees - ideally at least TARGET_COMMITTEE_SIZE
# per committee - assuming randomness in vote arrival, this limit should
# cover that but of course, when votes arrive depends on a number of
# factors that are not entire random
attestationsQueue: newAsyncQueue[AttestationEntry](
(TARGET_COMMITTEE_SIZE * MAX_COMMITTEES_PER_SLOT).int),
)