Skip to content
Merged
Prev Previous commit
Next Next commit
increase attestation/aggregate queue sizes
when there are many validators, many aggregates and attestations arrive
every slot - increase the queue size a bit - also do batches on each
idle loop iteration since it's fairly quick
  • Loading branch information
arnetheduck committed Jan 26, 2021
commit 4fed4295afb313c0173e808cc60ef3948cad6352
23 changes: 21 additions & 2 deletions beacon_chain/eth2_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -536,10 +536,21 @@ proc runQueueProcessingLoop*(self: ref Eth2Processor) {.async.} =
elif aggregateFut.finished:
# aggregates will be dropped under heavy load on producer side
self[].processAggregate(aggregateFut.read())
for i in 0..<7: # process a few at a time - this is fairly fast
if self[].aggregatesQueue.empty():
break
self[].processAggregate(self[].aggregatesQueue.popFirstNoWait())

aggregateFut = self[].aggregatesQueue.popFirst()
elif attestationFut.finished:
# attestations will be dropped under heavy load on producer side
self[].processAttestation(attestationFut.read())

for i in 0..<7: # process a few at a time - this is fairly fast
if self[].attestationsQueue.empty():
break
self[].processAttestation(self[].attestationsQueue.popFirstNoWait())

attestationFut = self[].attestationsQueue.popFirst()

proc new*(T: type Eth2Processor,
Expand All @@ -560,6 +571,14 @@ proc new*(T: type Eth2Processor,
quarantine: quarantine,
blockReceivedDuringSlot: newFuture[void](),
blocksQueue: newAsyncQueue[BlockEntry](1),
aggregatesQueue: newAsyncQueue[AggregateEntry](MAX_ATTESTATIONS.int),
attestationsQueue: newAsyncQueue[AttestationEntry](TARGET_COMMITTEE_SIZE.int * 4),
# limit to the max number of aggregates we expect to see in one slot
aggregatesQueue: newAsyncQueue[AggregateEntry](
(TARGET_AGGREGATORS_PER_COMMITTEE * MAX_COMMITTEES_PER_SLOT).int),
# This queue is a bit harder to bound reasonably - we want to get a good
# spread of votes across committees - ideally at least TARGET_COMMITTEE_SIZE
# per committee - assuming randomness in vote arrival, this limit should
# cover that but of course, when votes arrive depends on a number of
# factors that are not entire random
attestationsQueue: newAsyncQueue[AttestationEntry](
(TARGET_COMMITTEE_SIZE * MAX_COMMITTEES_PER_SLOT).int),
)