diff --git a/proxy/common_neon/solana_interactor.py b/proxy/common_neon/solana_interactor.py index aa6f3bd61..c84e65fed 100644 --- a/proxy/common_neon/solana_interactor.py +++ b/proxy/common_neon/solana_interactor.py @@ -135,13 +135,13 @@ def _send_rpc_batch_request(self, method: str, params_list: List[Any]) -> List[R return full_response_data - def get_signatures_for_address(self, before, until, commitment='confirmed'): + def get_signatures_for_address(self, before: Optional[str], limit: int, commitment='confirmed') -> []: opts: Dict[str, Union[int, str]] = {} - if until is not None: - opts["until"] = until if before is not None: opts["before"] = before + opts["limit"] = limit opts["commitment"] = commitment + return self._send_rpc_request("getSignaturesForAddress", EVM_LOADER_ID, opts) def get_confirmed_transaction(self, sol_sign: str, encoding: str = "json"): diff --git a/proxy/environment.py b/proxy/environment.py index d43647ba1..5c539fd67 100644 --- a/proxy/environment.py +++ b/proxy/environment.py @@ -32,8 +32,7 @@ FUZZING_BLOCKHASH = os.environ.get("FUZZING_BLOCKHASH", "NO") == "YES" CONFIRM_TIMEOUT = max(int(os.environ.get("CONFIRM_TIMEOUT", 10)), 10) PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2")) -DEVNET_HISTORY_START = "7BdwyUQ61RUZP63HABJkbW66beLk22tdXnP69KsvQBJekCPVaHoJY47Rw68b3VV1UbQNHxX3uxUSLfiJrfy2bTn" -HISTORY_START = [DEVNET_HISTORY_START] +HISTORY_START = "7BdwyUQ61RUZP63HABJkbW66beLk22tdXnP69KsvQBJekCPVaHoJY47Rw68b3VV1UbQNHxX3uxUSLfiJrfy2bTn" START_SLOT = os.environ.get('START_SLOT', 0) FINALIZED = os.environ.get('FINALIZED', 'finalized') CANCEL_TIMEOUT = int(os.environ.get("CANCEL_TIMEOUT", "60")) diff --git a/proxy/indexer/indexer_base.py b/proxy/indexer/indexer_base.py index 23d511150..be060f22c 100644 --- a/proxy/indexer/indexer_base.py +++ b/proxy/indexer/indexer_base.py @@ -3,10 +3,12 @@ import traceback from multiprocessing.dummy import Pool as ThreadPool from logged_groups import logged_group +from typing import Optional from .trx_receipts_storage import TrxReceiptsStorage from .utils import MetricsToLogBuff from ..common_neon.solana_interactor import SolanaInteractor +from ..indexer.sql_dict import SQLDict from ..environment import RETRY_ON_FAIL_ON_GETTING_CONFIRMED_TRANSACTION from ..environment import HISTORY_START, PARALLEL_REQUESTS, FINALIZED, EVM_LOADER_ID @@ -19,11 +21,21 @@ def __init__(self, last_slot: int): self.solana = solana self.transaction_receipts = TrxReceiptsStorage('transaction_receipts') - self.max_known_tx = self.transaction_receipts.max_known_trx() self.last_slot = self._init_last_slot('receipt', last_slot) self.current_slot = 0 self.counter_ = 0 self.count_log = MetricsToLogBuff() + self._constants = SQLDict(tablename="constants") + self._maximum_tx = self._get_maximum_tx() + + def _get_maximum_tx(self) -> str: + if "maximum_tx" in self._constants: + return self._constants["maximum_tx"] + return "" + + def _set_maximum_tx(self, tx: str): + self._maximum_tx = tx + self._constants["maximum_tx"] = tx def _init_last_slot(self, name: str, last_known_slot: int): """ @@ -66,12 +78,12 @@ def _init_last_slot(self, name: str, last_known_slot: int): return start_int_slot def run(self): - while (True): + while True: try: self.process_functions() except Exception as err: err_tb = "".join(traceback.format_tb(err.__traceback__)) - self.warning('Exception on submitting transaction. ' + + self.warning('Exception on transactions processing. ' + f'Type(err): {type(err)}, Error: {err}, Traceback: {err_tb}') time.sleep(1.0) @@ -83,24 +95,26 @@ def gather_unknown_transactions(self): poll_txs = set() minimal_tx = None + maximum_tx = None + maximum_slot = None continue_flag = True current_slot = self.solana.get_slot(commitment=FINALIZED)["result"] - max_known_tx = self.max_known_tx - counter = 0 gathered_signatures = 0 - while (continue_flag): - results = self._get_signatures(minimal_tx, self.max_known_tx[1]) - - if len(results) == 0: + while continue_flag: + results = self._get_signatures(minimal_tx, 1000) + len_results = len(results) + if len_results == 0: break minimal_tx = results[-1]["signature"] - max_tx = (results[0]["slot"], results[0]["signature"]) - max_known_tx = max(max_known_tx, max_tx) + if maximum_tx is None: + tx = results[0] + maximum_tx = tx["signature"] + maximum_slot = tx["slot"] - gathered_signatures += len(results) + gathered_signatures += len_results counter += 1 for tx in results: @@ -111,7 +125,7 @@ def gather_unknown_transactions(self): continue_flag = False break - if solana_signature in HISTORY_START: + if solana_signature in [ HISTORY_START, self._maximum_tx]: self.debug(solana_signature) continue_flag = False break @@ -124,17 +138,17 @@ def gather_unknown_transactions(self): self.current_slot = current_slot self.counter_ = 0 - self.max_known_tx = max_known_tx + self._set_maximum_tx(maximum_tx) get_history_ms = (time.time() - start_time) * 1000 # convert this into milliseconds self.count_log.print( self.debug, list_params={"get_history_ms": get_history_ms, "gathered_signatures": gathered_signatures, "counter": counter}, - latest_params={"max_known_tx": max_known_tx} + latest_params={"maximum_tx": maximum_tx, "maximum_slot": maximum_slot} ) - def _get_signatures(self, before, until): - response = self.solana.get_signatures_for_address(before, until, FINALIZED) + def _get_signatures(self, before: Optional[str], limit: int) -> []: + response = self.solana.get_signatures_for_address(before, limit, FINALIZED) error = response.get('error') result = response.get('result', []) if error: diff --git a/proxy/memdb/blocks_db.py b/proxy/memdb/blocks_db.py index f89c427d9..7933065e5 100644 --- a/proxy/memdb/blocks_db.py +++ b/proxy/memdb/blocks_db.py @@ -48,9 +48,7 @@ def execute(self) -> bool: return False def _get_latest_db_block(self): - self.latest_db_block_slot = self._b.db.get_latest_block().slot - if not self.latest_db_block_slot: - self.latest_db_block_slot = self._b.solana.get_recent_blockslot(commitment=FINALIZED) + self.latest_db_block_slot = self._b.solana.get_recent_blockslot(commitment=FINALIZED) def _get_solana_block_list(self) -> bool: latest_db_slot = self.latest_db_block_slot diff --git a/proxy/plugin/solana_rest_api.py b/proxy/plugin/solana_rest_api.py index f151d8171..e02b72c15 100644 --- a/proxy/plugin/solana_rest_api.py +++ b/proxy/plugin/solana_rest_api.py @@ -42,7 +42,7 @@ modelInstanceLock = threading.Lock() modelInstance = None -NEON_PROXY_PKG_VERSION = '0.7.1-dev' +NEON_PROXY_PKG_VERSION = '0.7.2-dev' NEON_PROXY_REVISION = 'NEON_PROXY_REVISION_TO_BE_REPLACED'