Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions proxy/airdropper/airdropper.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,13 @@ def process_functions(self):
"""
Overrides IndexerBase.process_functions
"""
IndexerBase.process_functions(self)
self.debug("Process receipts")
self.process_receipts()
self.process_scheduled_trxs()

def process_receipts(self):
max_slot = 0
for slot, _, trx in self.transaction_receipts.get_txs(self.latest_processed_slot):
for slot, _, trx in self.get_tx_receipts():
max_slot = max(max_slot, slot)
if trx['transaction']['message']['instructions'] is not None:
self.process_trx_airdropper_mode(trx)
Expand Down
17 changes: 3 additions & 14 deletions proxy/db/scheme.sql
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,7 @@
neon_income BIGINT
);

CREATE TABLE IF NOT EXISTS solana_transaction_receipts (
slot BIGINT,
tx_idx INT,
signature VARCHAR(88),
tx BYTEA,
PRIMARY KEY (slot, signature)
);

CREATE TABLE IF NOT EXISTS test_storage (
slot BIGINT,
tx_idx INT,
signature VARCHAR(88),
tx BYTEA,
PRIMARY KEY (slot, signature)
CREATE TABLE IF NOT EXISTS solana_transaction_signatures (
slot BIGINT UNIQUE,
signature VARCHAR(88)
);
4 changes: 1 addition & 3 deletions proxy/indexer/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,6 @@ def __init__(self, solana_url, indexer_user: IIndexerUser):

def process_functions(self):
self.block_indexer.gather_blocks()
IndexerBase.process_functions(self)
self.process_receipts()
self.canceller.unlock_accounts(self.blocked_storages)
self.blocked_storages = {}
Expand All @@ -915,7 +914,7 @@ def process_receipts(self):
max_slot = 1
while max_slot > 0:
max_slot = 0
for slot, sign, tx in self.transaction_receipts.get_txs(self.indexed_slot, last_block_slot):
for slot, sign, tx in self.get_tx_receipts(last_block_slot):
max_slot = max(max_slot, slot)

ix_info = SolanaIxInfo(sign=sign, slot=slot, tx=tx)
Expand Down Expand Up @@ -955,7 +954,6 @@ def process_receipts(self):
"processed slots": self.indexed_slot - start_indexed_slot
},
latest_params={
"transaction receipts len": self.transaction_receipts.size(),
"indexed slot": self.indexed_slot,
"min used slot": self.min_used_slot
}
Expand Down
134 changes: 68 additions & 66 deletions proxy/indexer/indexer_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import traceback
from multiprocessing.dummy import Pool as ThreadPool
from logged_groups import logged_group
from typing import List, Optional, Tuple
from typing import Dict, List, Optional, Union

from .trx_receipts_storage import TxReceiptsStorage
from .solana_signatures_db import SolanaSignatures
from .utils import MetricsToLogBuff
from ..common_neon.solana_interactor import SolanaInteractor
from ..indexer.sql_dict import SQLDict
Expand All @@ -20,18 +20,19 @@ def __init__(self,
solana: SolanaInteractor,
last_slot: int):
self.solana = solana
self.transaction_receipts = TxReceiptsStorage('solana_transaction_receipts')
self.solana_signatures = SolanaSignatures()
self.last_slot = self._init_last_slot('receipt', last_slot)
self.current_slot = 0
self.counter_ = 0
self.count_log = MetricsToLogBuff()
self._constants = SQLDict(tablename="constants")
self._maximum_tx = self._get_maximum_tx()
self._tx_receipts = {}

def _get_maximum_tx(self) -> str:
if "maximum_tx" in self._constants:
return self._constants["maximum_tx"]
return ""
return HISTORY_START

def _set_maximum_tx(self, tx: str):
self._maximum_tx = tx
Expand Down Expand Up @@ -90,109 +91,109 @@ def run(self):
def process_functions(self):
self.gather_unknown_transactions()

def gather_unknown_transactions(self):
start_time = time.time()
def get_tx_receipts(self, stop_slot=None):
signatures = self.gather_unknown_transactions()
self.debug(f'{len(signatures)}')

poll_txs = []
tx_list = []
for signature, _ in reversed(signatures):
if signature not in self._tx_receipts:
tx_list.append(signature)
if len(tx_list) >= 20:
poll_txs.append(tx_list)
tx_list = []
if len(tx_list) > 0:
poll_txs.append(tx_list)
self._get_txs(poll_txs)

minimal_tx = None
maximum_tx = None
maximum_slot = None
continue_flag = True
current_slot = self.solana.get_slot(commitment=FINALIZED)["result"]
tx_per_request = 20
for signature, _ in reversed(signatures):
if signature not in self._tx_receipts:
self.error(f'{signature} receipt not found')
continue

tx = self._tx_receipts[signature]
slot = tx['slot']
if stop_slot and slot > stop_slot:
break
yield (slot, signature, tx)

self._set_maximum_tx(signature)
self.solana_signatures.remove_signature(signature)
del self._tx_receipts[signature]

def gather_unknown_transactions(self):
minimal_tx = self.solana_signatures.get_minimal_tx()
continue_flag = True
counter = 0
gathered_signatures = 0
tx_list = []
while continue_flag:
results = self._get_signatures(minimal_tx, 1000)
results = self._get_signatures(minimal_tx, self._maximum_tx, INDEXER_POLL_COUNT)
len_results = len(results)
if len_results == 0:
break

minimal_tx = results[-1]["signature"]
if maximum_tx is None:
tx = results[0]
maximum_tx = tx["signature"]
maximum_slot = tx["slot"]

gathered_signatures += len_results
counter += 1

tx_idx = 0
prev_slot = 0

for tx in results:
sol_sign = tx["signature"]
slot = tx["slot"]

if slot != prev_slot:
tx_idx = 0
prev_slot = slot

if slot < self.last_slot:
if sol_sign == self._maximum_tx:
continue_flag = False
break

if sol_sign in [HISTORY_START, self._maximum_tx]:
if slot < self.last_slot:
continue_flag = False
break

tx_list.append((sol_sign, slot, tx_idx))
if len(tx_list) >= tx_per_request:
poll_txs.append(tx_list)
if len(tx_list) >= INDEXER_POLL_COUNT:
self.solana_signatures.add_signature(tx_list[0][0], tx_list[0][1])
tx_list = []
if len(poll_txs) >= INDEXER_POLL_COUNT / tx_per_request:
self._get_txs(poll_txs)

tx_idx += 1
tx_list.append((sol_sign, slot))

if len(tx_list) > 0:
poll_txs.append(tx_list)
if len(poll_txs) > 0:
self._get_txs(poll_txs)

self.current_slot = current_slot
self.counter_ = 0
self._set_maximum_tx(maximum_tx)
return tx_list

get_history_ms = (time.time() - start_time) * 1000 # convert this into milliseconds
self.count_log.print(
self.debug,
list_params={"get_history_ms": get_history_ms, "gathered_signatures": gathered_signatures, "counter": counter},
latest_params={"maximum_tx": maximum_tx, "maximum_slot": maximum_slot}
)

def _get_signatures(self, before: Optional[str], limit: int) -> []:
response = self.solana.get_signatures_for_address(before, limit, FINALIZED)
def _get_signatures(self, before: Optional[str], until: Optional[str], limit: int) -> List[Dict[str, Union[int, str]]]:
opts: Dict[str, Union[int, str]] = {}
if before is not None:
opts["before"] = before
if until is not None:
opts["until"] = until
opts["limit"] = limit
opts["commitment"] = FINALIZED
response = self.solana._send_rpc_request("getSignaturesForAddress", EVM_LOADER_ID, opts)
error = response.get('error')
result = response.get('result', [])
if error:
self.warning(f'Fail to get signatures: {error}')
return result

def _get_txs(self, poll_txs: List[List[Tuple[str, int, int]]]) -> None:
pool = ThreadPool(PARALLEL_REQUESTS)
pool.map(self._get_tx_receipts, poll_txs)
poll_txs.clear()

def _get_tx_receipts(self, full_list: List[Tuple[str, int, int]]) -> None:
sign_list = []
filtered_list = []
for sol_sign, slot, tx_idx in full_list:
if not self.transaction_receipts.contains(slot, sol_sign):
sign_list.append(sol_sign)
filtered_list.append((sol_sign, slot, tx_idx))
def _get_txs(self, poll_txs: List[List[str]]) -> None:
if len(poll_txs) > 1:
pool = ThreadPool(min(PARALLEL_REQUESTS, len(poll_txs)))
pool.map(self._get_tx_receipts, poll_txs)
poll_txs.clear()
else:
if len(poll_txs) > 0:
self._get_tx_receipts(poll_txs[0])

def _get_tx_receipts(self, sign_list: List[str]) -> None:
if len(sign_list) == 0:
return

retry = RETRY_ON_FAIL_ON_GETTING_CONFIRMED_TRANSACTION
while retry > 0:
try:
tx_list = self.solana.get_multiple_receipts(sign_list)
for tx_info, tx in zip(filtered_list, tx_list):
sol_sign, slot, tx_idx = tx_info
self._add_tx(sol_sign, tx, slot, tx_idx)
for sol_sign, tx in zip(sign_list, tx_list):
self._add_tx(sol_sign, tx)
retry = 0
except Exception as err:
retry -= 1
Expand All @@ -206,16 +207,17 @@ def _get_tx_receipts(self, full_list: List[Tuple[str, int, int]]) -> None:
if self.counter_ % 100 == 0:
self.debug(f"Acquired {self.counter_} receipts")

def _add_tx(self, sol_sign, tx, slot, tx_idx):
def _add_tx(self, sol_sign, tx):
if tx is not None:
add = False
msg = tx['transaction']['message']
slot = tx['slot']
for instruction in msg['instructions']:
if msg["accountKeys"][instruction["programIdIndex"]] == EVM_LOADER_ID:
add = True
if add:
self.debug(f'{(slot, tx_idx, sol_sign)}')
self.transaction_receipts.add_tx(slot, tx_idx, sol_sign, tx)
self.debug(f'{(slot, sol_sign)}')
self._tx_receipts[sol_sign] = tx
else:
self.debug(f"trx is None {sol_sign}")

26 changes: 26 additions & 0 deletions proxy/indexer/solana_signatures_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from ..indexer.base_db import BaseDB


class SolanaSignatures(BaseDB):
def __init__(self):
BaseDB.__init__(self, 'solana_transaction_signatures')

def add_signature(self, signature, slot):
with self._conn.cursor() as cursor:
cursor.execute(f'''
INSERT INTO solana_transaction_signatures
(slot, signature)
VALUES(%s, %s) ON CONFLICT DO NOTHING''',
(slot, signature))

def remove_signature(self, signature):
with self._conn.cursor() as cursor:
cursor.execute(f'DELETE FROM solana_transaction_signatures WHERE signature = %s', (signature,))

def get_minimal_tx(self):
with self._conn.cursor() as cursor:
cursor.execute(f'SELECT slot, signature FROM solana_transaction_signatures ORDER BY slot LIMIT 1')
row = cursor.fetchone()
if row is not None:
return row[1]
return None
77 changes: 0 additions & 77 deletions proxy/indexer/trx_receipts_storage.py

This file was deleted.

Loading