Skip to content
Next Next commit
When poll_txs gathered INDEXER_POLL_COUNT txs call gathering and …
…clear list
  • Loading branch information
otselnik committed Apr 15, 2022
commit 7a5ce8734e1c7ac0f0ee1c8fd25bff5a2369dd98
1 change: 1 addition & 0 deletions proxy/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
CONFIRM_TIMEOUT = max(int(os.environ.get("CONFIRM_TIMEOUT", 10)), 10)
PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2"))
HISTORY_START = "7BdwyUQ61RUZP63HABJkbW66beLk22tdXnP69KsvQBJekCPVaHoJY47Rw68b3VV1UbQNHxX3uxUSLfiJrfy2bTn"
INDEXER_POLL_COUNT = int(os.environ.get("INDEXER_POLL_COUNT", "1000"))
START_SLOT = os.environ.get('START_SLOT', 0)
FINALIZED = os.environ.get('FINALIZED', 'finalized')
CANCEL_TIMEOUT = int(os.environ.get("CANCEL_TIMEOUT", "60"))
Expand Down
19 changes: 14 additions & 5 deletions proxy/indexer/indexer_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import traceback
from multiprocessing.dummy import Pool as ThreadPool
from logged_groups import logged_group
from typing import Optional
from typing import List, Optional, Tuple

from .trx_receipts_storage import TxReceiptsStorage
from .utils import MetricsToLogBuff
from ..common_neon.solana_interactor import SolanaInteractor
from ..indexer.sql_dict import SQLDict

from ..environment import RETRY_ON_FAIL_ON_GETTING_CONFIRMED_TRANSACTION
from ..environment import INDEXER_POLL_COUNT, RETRY_ON_FAIL_ON_GETTING_CONFIRMED_TRANSACTION
from ..environment import HISTORY_START, PARALLEL_REQUESTS, FINALIZED, EVM_LOADER_ID


Expand Down Expand Up @@ -138,10 +138,14 @@ def gather_unknown_transactions(self):

if not self.transaction_receipts.contains(slot, sol_sign):
poll_txs.append((sol_sign, slot, tx_idx))

if len(poll_txs) >= INDEXER_POLL_COUNT:
self._get_txs(poll_txs)

tx_idx += 1

pool = ThreadPool(PARALLEL_REQUESTS)
pool.map(self._get_tx_receipts, poll_txs)
if len(poll_txs) > 0:
self._get_txs(poll_txs)

self.current_slot = current_slot
self.counter_ = 0
Expand All @@ -162,7 +166,12 @@ def _get_signatures(self, before: Optional[str], limit: int) -> []:
self.warning(f'Fail to get signatures: {error}')
return result

def _get_tx_receipts(self, param):
def _get_txs(self, poll_txs: List[Tuple[str, int, int]]) -> None:
pool = ThreadPool(PARALLEL_REQUESTS)
pool.map(self._get_tx_receipts, poll_txs)
poll_txs.clear()

def _get_tx_receipts(self, param: Tuple[str, int, int]) -> None:
# tx = None
retry = RETRY_ON_FAIL_ON_GETTING_CONFIRMED_TRANSACTION

Expand Down