Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Fix bugs with getting list of executed transactions in indexer
  • Loading branch information
afalaleev committed Mar 18, 2022
commit db1ac0dde3075020d72ff5b443ac151fb0764580
6 changes: 3 additions & 3 deletions proxy/common_neon/solana_interactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,13 @@ def _send_rpc_batch_request(self, method: str, params_list: List[Any]) -> List[R

return full_response_data

def get_signatures_for_address(self, before, until, commitment='confirmed'):
def get_signatures_for_address(self, before: Optional[str], limit: int, commitment='confirmed') -> []:
opts: Dict[str, Union[int, str]] = {}
if until is not None:
opts["until"] = until
if before is not None:
opts["before"] = before
opts["limit"] = limit
opts["commitment"] = commitment

return self._send_rpc_request("getSignaturesForAddress", EVM_LOADER_ID, opts)

def get_confirmed_transaction(self, sol_sign: str, encoding: str = "json"):
Expand Down
3 changes: 1 addition & 2 deletions proxy/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
FUZZING_BLOCKHASH = os.environ.get("FUZZING_BLOCKHASH", "NO") == "YES"
CONFIRM_TIMEOUT = max(int(os.environ.get("CONFIRM_TIMEOUT", 10)), 10)
PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2"))
DEVNET_HISTORY_START = "7BdwyUQ61RUZP63HABJkbW66beLk22tdXnP69KsvQBJekCPVaHoJY47Rw68b3VV1UbQNHxX3uxUSLfiJrfy2bTn"
HISTORY_START = [DEVNET_HISTORY_START]
HISTORY_START = "7BdwyUQ61RUZP63HABJkbW66beLk22tdXnP69KsvQBJekCPVaHoJY47Rw68b3VV1UbQNHxX3uxUSLfiJrfy2bTn"
START_SLOT = os.environ.get('START_SLOT', 0)
FINALIZED = os.environ.get('FINALIZED', 'finalized')
CANCEL_TIMEOUT = int(os.environ.get("CANCEL_TIMEOUT", "60"))
Expand Down
48 changes: 31 additions & 17 deletions proxy/indexer/indexer_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import traceback
from multiprocessing.dummy import Pool as ThreadPool
from logged_groups import logged_group
from typing import Optional

from .trx_receipts_storage import TrxReceiptsStorage
from .utils import MetricsToLogBuff
from ..common_neon.solana_interactor import SolanaInteractor
from ..indexer.sql_dict import SQLDict

from ..environment import RETRY_ON_FAIL_ON_GETTING_CONFIRMED_TRANSACTION
from ..environment import HISTORY_START, PARALLEL_REQUESTS, FINALIZED, EVM_LOADER_ID
Expand All @@ -19,11 +21,21 @@ def __init__(self,
last_slot: int):
self.solana = solana
self.transaction_receipts = TrxReceiptsStorage('transaction_receipts')
self.max_known_tx = self.transaction_receipts.max_known_trx()
self.last_slot = self._init_last_slot('receipt', last_slot)
self.current_slot = 0
self.counter_ = 0
self.count_log = MetricsToLogBuff()
self._constants = SQLDict(tablename="constants")
self._maximum_tx = self._get_maximum_tx()

def _get_maximum_tx(self) -> str:
if "maximum_tx" in self._constants:
return self._constants["maximum_tx"]
return ""

def _set_maximum_tx(self, tx: str):
self._maximum_tx = tx
self._constants["maximum_tx"] = tx

def _init_last_slot(self, name: str, last_known_slot: int):
"""
Expand Down Expand Up @@ -66,12 +78,12 @@ def _init_last_slot(self, name: str, last_known_slot: int):
return start_int_slot

def run(self):
while (True):
while True:
try:
self.process_functions()
except Exception as err:
err_tb = "".join(traceback.format_tb(err.__traceback__))
self.warning('Exception on submitting transaction. ' +
self.warning('Exception on transactions processing. ' +
f'Type(err): {type(err)}, Error: {err}, Traceback: {err_tb}')
time.sleep(1.0)

Expand All @@ -83,24 +95,26 @@ def gather_unknown_transactions(self):
poll_txs = set()

minimal_tx = None
maximum_tx = None
maximum_slot = None
continue_flag = True
current_slot = self.solana.get_slot(commitment=FINALIZED)["result"]

max_known_tx = self.max_known_tx

counter = 0
gathered_signatures = 0
while (continue_flag):
results = self._get_signatures(minimal_tx, self.max_known_tx[1])

if len(results) == 0:
while continue_flag:
results = self._get_signatures(minimal_tx, 1000)
len_results = len(results)
if len_results == 0:
break

minimal_tx = results[-1]["signature"]
max_tx = (results[0]["slot"], results[0]["signature"])
max_known_tx = max(max_known_tx, max_tx)
if maximum_tx is None:
tx = results[0]
maximum_tx = tx["signature"]
maximum_slot = tx["slot"]

gathered_signatures += len(results)
gathered_signatures += len_results
counter += 1

for tx in results:
Expand All @@ -111,7 +125,7 @@ def gather_unknown_transactions(self):
continue_flag = False
break

if solana_signature in HISTORY_START:
if solana_signature in [ HISTORY_START, self._maximum_tx]:
self.debug(solana_signature)
continue_flag = False
break
Expand All @@ -124,17 +138,17 @@ def gather_unknown_transactions(self):

self.current_slot = current_slot
self.counter_ = 0
self.max_known_tx = max_known_tx
self._set_maximum_tx(maximum_tx)

get_history_ms = (time.time() - start_time) * 1000 # convert this into milliseconds
self.count_log.print(
self.debug,
list_params={"get_history_ms": get_history_ms, "gathered_signatures": gathered_signatures, "counter": counter},
latest_params={"max_known_tx": max_known_tx}
latest_params={"maximum_tx": maximum_tx, "maximum_slot": maximum_slot}
)

def _get_signatures(self, before, until):
response = self.solana.get_signatures_for_address(before, until, FINALIZED)
def _get_signatures(self, before: Optional[str], limit: int) -> []:
response = self.solana.get_signatures_for_address(before, limit, FINALIZED)
error = response.get('error')
result = response.get('result', [])
if error:
Expand Down
4 changes: 1 addition & 3 deletions proxy/memdb/blocks_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ def execute(self) -> bool:
return False

def _get_latest_db_block(self):
self.latest_db_block_slot = self._b.db.get_latest_block().slot
if not self.latest_db_block_slot:
self.latest_db_block_slot = self._b.solana.get_recent_blockslot(commitment=FINALIZED)
self.latest_db_block_slot = self._b.solana.get_recent_blockslot(commitment=FINALIZED)

def _get_solana_block_list(self) -> bool:
latest_db_slot = self.latest_db_block_slot
Expand Down
2 changes: 1 addition & 1 deletion proxy/plugin/solana_rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
modelInstanceLock = threading.Lock()
modelInstance = None

NEON_PROXY_PKG_VERSION = '0.7.1-dev'
NEON_PROXY_PKG_VERSION = '0.7.2-dev'
NEON_PROXY_REVISION = 'NEON_PROXY_REVISION_TO_BE_REPLACED'


Expand Down