diff --git a/proxy/airdropper/airdropper.py b/proxy/airdropper/airdropper.py index b4ed81d86..3a9bf9ede 100644 --- a/proxy/airdropper/airdropper.py +++ b/proxy/airdropper/airdropper.py @@ -258,14 +258,13 @@ def process_functions(self): """ Overrides IndexerBase.process_functions """ - IndexerBase.process_functions(self) self.debug("Process receipts") self.process_receipts() self.process_scheduled_trxs() def process_receipts(self): max_slot = 0 - for slot, _, trx in self.transaction_receipts.get_txs(self.latest_processed_slot): + for slot, _, trx in self.get_tx_receipts(): max_slot = max(max_slot, slot) if trx['transaction']['message']['instructions'] is not None: self.process_trx_airdropper_mode(trx) diff --git a/proxy/db/scheme.sql b/proxy/db/scheme.sql index d795098e6..3cbfc1bd4 100644 --- a/proxy/db/scheme.sql +++ b/proxy/db/scheme.sql @@ -125,18 +125,7 @@ neon_income BIGINT ); - CREATE TABLE IF NOT EXISTS solana_transaction_receipts ( - slot BIGINT, - tx_idx INT, - signature VARCHAR(88), - tx BYTEA, - PRIMARY KEY (slot, signature) - ); - - CREATE TABLE IF NOT EXISTS test_storage ( - slot BIGINT, - tx_idx INT, - signature VARCHAR(88), - tx BYTEA, - PRIMARY KEY (slot, signature) + CREATE TABLE IF NOT EXISTS solana_transaction_signatures ( + slot BIGINT UNIQUE, + signature VARCHAR(88) ); diff --git a/proxy/indexer/indexer.py b/proxy/indexer/indexer.py index b6ac45b18..db61fd0bf 100644 --- a/proxy/indexer/indexer.py +++ b/proxy/indexer/indexer.py @@ -964,7 +964,6 @@ def __init__(self, solana_url, indexer_user: IIndexerUser): def process_functions(self): self.block_indexer.gather_blocks() - IndexerBase.process_functions(self) self.process_receipts() self.canceller.unlock_accounts(self.blocked_storages) self.blocked_storages = {} @@ -977,7 +976,7 @@ def process_receipts(self): max_slot = 1 while max_slot > 0: max_slot = 0 - for slot, sign, tx in self.transaction_receipts.get_txs(self.indexed_slot, last_block_slot): + for slot, sign, tx in self.get_tx_receipts(last_block_slot): max_slot = max(max_slot, slot) ix_info = SolanaIxInfo(sign=sign, slot=slot, tx=tx) @@ -1017,7 +1016,6 @@ def process_receipts(self): "processed slots": self.indexed_slot - start_indexed_slot }, latest_params={ - "transaction receipts len": self.transaction_receipts.size(), "indexed slot": self.indexed_slot, "min used slot": self.min_used_slot } diff --git a/proxy/indexer/indexer_base.py b/proxy/indexer/indexer_base.py index 53ebdf7ea..7eb073fac 100644 --- a/proxy/indexer/indexer_base.py +++ b/proxy/indexer/indexer_base.py @@ -3,9 +3,9 @@ import traceback from multiprocessing.dummy import Pool as ThreadPool from logged_groups import logged_group -from typing import List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union -from .trx_receipts_storage import TxReceiptsStorage +from .solana_signatures_db import SolanaSignatures from .utils import MetricsToLogBuff from ..common_neon.solana_interactor import SolanaInteractor from ..indexer.sql_dict import SQLDict @@ -20,18 +20,19 @@ def __init__(self, solana: SolanaInteractor, last_slot: int): self.solana = solana - self.transaction_receipts = TxReceiptsStorage('solana_transaction_receipts') + self.solana_signatures = SolanaSignatures() self.last_slot = self._init_last_slot('receipt', last_slot) self.current_slot = 0 self.counter_ = 0 self.count_log = MetricsToLogBuff() self._constants = SQLDict(tablename="constants") self._maximum_tx = self._get_maximum_tx() + self._tx_receipts = {} def _get_maximum_tx(self) -> str: if "maximum_tx" in self._constants: return self._constants["maximum_tx"] - return "" + return HISTORY_START def _set_maximum_tx(self, tx: str): self._maximum_tx = tx @@ -90,80 +91,82 @@ def run(self): def process_functions(self): self.gather_unknown_transactions() - def gather_unknown_transactions(self): - start_time = time.time() + def get_tx_receipts(self, stop_slot=None): + signatures = self.gather_unknown_transactions() + self.debug(f'{len(signatures)}') + poll_txs = [] tx_list = [] + for signature, _ in reversed(signatures): + if signature not in self._tx_receipts: + tx_list.append(signature) + if len(tx_list) >= 20: + poll_txs.append(tx_list) + tx_list = [] + if len(tx_list) > 0: + poll_txs.append(tx_list) + self._get_txs(poll_txs) + + max_tx = self._maximum_tx + remove_signatures: List[str] = [] + for signature, _ in reversed(signatures): + if signature not in self._tx_receipts: + self.error(f'{signature} receipt not found') + continue + + tx = self._tx_receipts[signature] + slot = tx['slot'] + if stop_slot and slot > stop_slot: + break + yield (slot, signature, tx) - minimal_tx = None - maximum_tx = None - maximum_slot = None - continue_flag = True - current_slot = self.solana.get_slot(commitment=FINALIZED)["result"] - tx_per_request = 20 + remove_signatures.append(signature) + del self._tx_receipts[signature] + max_tx = signature + + self.solana_signatures.remove_signature(remove_signatures) + self._set_maximum_tx(max_tx) + self._clear_tx_receipts() + def gather_unknown_transactions(self): + minimal_tx = self.solana_signatures.get_minimal_tx() + continue_flag = True counter = 0 gathered_signatures = 0 + tx_list = [] while continue_flag: - results = self._get_signatures(minimal_tx, 1000) + results = self._get_signatures(minimal_tx, INDEXER_POLL_COUNT) len_results = len(results) if len_results == 0: break minimal_tx = results[-1]["signature"] - if maximum_tx is None: - tx = results[0] - maximum_tx = tx["signature"] - maximum_slot = tx["slot"] gathered_signatures += len_results counter += 1 - tx_idx = 0 - prev_slot = 0 - for tx in results: sol_sign = tx["signature"] slot = tx["slot"] - if slot != prev_slot: - tx_idx = 0 - prev_slot = slot - - if slot < self.last_slot: + if sol_sign == self._maximum_tx: continue_flag = False break - if sol_sign in [HISTORY_START, self._maximum_tx]: + if slot < self.last_slot: continue_flag = False break - tx_list.append((sol_sign, slot, tx_idx)) - if len(tx_list) >= tx_per_request: - poll_txs.append(tx_list) + if len(tx_list) >= INDEXER_POLL_COUNT: + self.solana_signatures.add_signature(tx_list[0][0], tx_list[0][1]) tx_list = [] - if len(poll_txs) >= INDEXER_POLL_COUNT / tx_per_request: - self._get_txs(poll_txs) - tx_idx += 1 + tx_list.append((sol_sign, slot)) - if len(tx_list) > 0: - poll_txs.append(tx_list) - if len(poll_txs) > 0: - self._get_txs(poll_txs) - - self.current_slot = current_slot - self.counter_ = 0 - self._set_maximum_tx(maximum_tx) + return tx_list - get_history_ms = (time.time() - start_time) * 1000 # convert this into milliseconds - self.count_log.print( - self.debug, - list_params={"get_history_ms": get_history_ms, "gathered_signatures": gathered_signatures, "counter": counter}, - latest_params={"maximum_tx": maximum_tx, "maximum_slot": maximum_slot} - ) - def _get_signatures(self, before: Optional[str], limit: int) -> List: + def _get_signatures(self, before: Optional[str], limit: int) -> List[Dict[str, Union[int, str]]]: response = self.solana.get_signatures_for_address(before, limit, FINALIZED) error = response.get('error') result = response.get('result', []) @@ -171,18 +174,16 @@ def _get_signatures(self, before: Optional[str], limit: int) -> List: self.warning(f'Fail to get signatures: {error}') return result - def _get_txs(self, poll_txs: List[List[Tuple[str, int, int]]]) -> None: - pool = ThreadPool(PARALLEL_REQUESTS) - pool.map(self._get_tx_receipts, poll_txs) - poll_txs.clear() - - def _get_tx_receipts(self, full_list: List[Tuple[str, int, int]]) -> None: - sign_list = [] - filtered_list = [] - for sol_sign, slot, tx_idx in full_list: - if not self.transaction_receipts.contains(slot, sol_sign): - sign_list.append(sol_sign) - filtered_list.append((sol_sign, slot, tx_idx)) + def _get_txs(self, poll_txs: List[List[str]]) -> None: + if len(poll_txs) > 1: + pool = ThreadPool(min(PARALLEL_REQUESTS, len(poll_txs))) + pool.map(self._get_tx_receipts, poll_txs) + poll_txs.clear() + else: + if len(poll_txs) > 0: + self._get_tx_receipts(poll_txs[0]) + + def _get_tx_receipts(self, sign_list: List[str]) -> None: if len(sign_list) == 0: return @@ -190,9 +191,8 @@ def _get_tx_receipts(self, full_list: List[Tuple[str, int, int]]) -> None: while retry > 0: try: tx_list = self.solana.get_multiple_receipts(sign_list) - for tx_info, tx in zip(filtered_list, tx_list): - sol_sign, slot, tx_idx = tx_info - self._add_tx(sol_sign, tx, slot, tx_idx) + for sol_sign, tx in zip(sign_list, tx_list): + self._add_tx(sol_sign, tx) retry = 0 except Exception as err: retry -= 1 @@ -202,13 +202,16 @@ def _get_tx_receipts(self, full_list: List[Tuple[str, int, int]]) -> None: self.debug(f'Fail to get solana receipts: "{err}"') time.sleep(3) - self.counter_ += 1 - if self.counter_ % 100 == 0: - self.debug(f"Acquired {self.counter_} receipts") - - def _add_tx(self, sol_sign, tx, slot, tx_idx): + def _add_tx(self, sol_sign, tx): if tx is not None: - self.debug(f'{(slot, tx_idx, sol_sign)}') - self.transaction_receipts.add_tx(slot, tx_idx, sol_sign, tx) + slot = tx['slot'] + self.debug(f'{(slot, sol_sign)}') + self._tx_receipts[sol_sign] = tx else: self.debug(f"trx is None {sol_sign}") + + def _clear_tx_receipts(self): + self.counter_ += 1 + if self.counter_ > 1000: + self._tx_receipts = {} + self.counter_ = 0 diff --git a/proxy/indexer/solana_signatures_db.py b/proxy/indexer/solana_signatures_db.py new file mode 100644 index 000000000..a8a885a90 --- /dev/null +++ b/proxy/indexer/solana_signatures_db.py @@ -0,0 +1,27 @@ +from typing import List +from ..indexer.base_db import BaseDB + + +class SolanaSignatures(BaseDB): + def __init__(self): + BaseDB.__init__(self, 'solana_transaction_signatures') + + def add_signature(self, signature, slot): + with self._conn.cursor() as cursor: + cursor.execute(f''' + INSERT INTO solana_transaction_signatures + (slot, signature) + VALUES(%s, %s) ON CONFLICT DO NOTHING''', + (slot, signature)) + + def remove_signature(self, signatures: List[str]): + with self._conn.cursor() as cursor: + cursor.executemany(f'DELETE FROM solana_transaction_signatures WHERE signature = %s', [*zip(iter(signatures))]) + + def get_minimal_tx(self): + with self._conn.cursor() as cursor: + cursor.execute(f'SELECT slot, signature FROM solana_transaction_signatures ORDER BY slot LIMIT 1') + row = cursor.fetchone() + if row is not None: + return row[1] + return None diff --git a/proxy/indexer/trx_receipts_storage.py b/proxy/indexer/trx_receipts_storage.py deleted file mode 100644 index eece9765f..000000000 --- a/proxy/indexer/trx_receipts_storage.py +++ /dev/null @@ -1,77 +0,0 @@ -from proxy.environment import INDEXER_RECEIPTS_COUNT_LIMIT -from proxy.indexer.pg_common import encode, decode -from proxy.indexer.base_db import BaseDB - - -class TxReceiptsStorage(BaseDB): - def __init__(self, table_name): - BaseDB.__init__(self, table_name) - - def clear(self): - with self._conn.cursor() as cur: - cur.execute(f'DELETE FROM {self._table_name}') - - def size(self): - with self._conn.cursor() as cur: - cur.execute(f'SELECT COUNT(*) FROM {self._table_name}') - rows = cur.fetchone()[0] - return rows if rows is not None else 0 - - def max_known_tx(self): - with self._conn.cursor() as cur: - cur.execute(f'SELECT slot, signature FROM {self._table_name} ORDER BY slot DESC, tx_idx ASC LIMIT 1') - row = cur.fetchone() - if row is not None: - return row[0], row[1] - return 0, None # table empty - return default value - - def add_tx(self, slot, tx_idx, signature, tx): - bin_tx = encode(tx) - with self._conn.cursor() as cur: - cur.execute(f''' - INSERT INTO {self._table_name} (slot, tx_idx, signature, tx) - VALUES (%s, %s, %s, %s) - ON CONFLICT (slot, signature) - DO UPDATE SET - tx = EXCLUDED.tx - ''', - (slot, tx_idx, signature, bin_tx) - ) - - def contains(self, slot, signature): - with self._conn.cursor() as cur: - cur.execute(f'SELECT 1 FROM {self._table_name} WHERE slot = %s AND signature = %s', (slot, signature,)) - return cur.fetchone() is not None - - def get_txs(self, start_slot=0, stop_slot=0): - with self._conn.cursor() as cur: - cur.execute(f'SELECT MIN(slot) FROM {self._table_name} WHERE slot > %s', (start_slot,)) - min_slot_row = cur.fetchone() - min_slot = (min_slot_row[0] if min_slot_row and min_slot_row[0] else 0) - - cur.execute(f''' - SELECT MAX(t.slot) FROM ( - SELECT slot FROM {self._table_name} - WHERE slot > %s - ORDER BY slot - LIMIT {INDEXER_RECEIPTS_COUNT_LIMIT} - ) AS t - ''', - (start_slot,)) - limit_slot_row = cur.fetchone() - limit_slot = (limit_slot_row[0] if limit_slot_row and limit_slot_row[0] else 0) - - limit_slot = max(min_slot, limit_slot, start_slot + 1) - if stop_slot > 0: - limit_slot = min(stop_slot, limit_slot) - - cur.execute(f''' - SELECT slot, signature, tx FROM {self._table_name} - WHERE slot >= %s AND slot <= %s - ORDER BY slot ASC, tx_idx DESC - ''', - (start_slot, limit_slot,)) - rows = cur.fetchall() - - for row in rows: - yield int(row[0]), row[1], decode(row[2]) diff --git a/proxy/testing/test_trx_receipts_storage.py b/proxy/testing/test_trx_receipts_storage.py deleted file mode 100644 index b45e10e64..000000000 --- a/proxy/testing/test_trx_receipts_storage.py +++ /dev/null @@ -1,65 +0,0 @@ -from unittest import TestCase -from proxy.indexer.trx_receipts_storage import TxReceiptsStorage -from random import randint -from base58 import b58encode - - -class TestTxReceiptsStorage(TestCase): - @classmethod - def setUpClass(cls) -> None: - cls.tx_receipts_storage = TxReceiptsStorage('test_storage') - - def create_signature(self): - signature = b'' - for i in range(0, 5): - signature += randint(0, 255).to_bytes(1, byteorder='big') - return b58encode(signature).decode("utf-8") - - def create_slot_sig(self, max_slot): - slot = randint(0, max_slot) - return (slot, self.create_signature()) - - def test_data_consistency(self): - """ - Test that data put into container is stored there - """ - self.tx_receipts_storage.clear() - self.assertEqual(self.tx_receipts_storage.size(), 0) - self.assertEqual(self.tx_receipts_storage.max_known_tx(), (0, None)) - - max_slot = 10 - num_items = 100 - expected_items = [] - for idx in range(0, num_items): - slot, signature = self.create_slot_sig(max_slot) - tx = {'slot': slot, 'signature': signature} - self.tx_receipts_storage.add_tx(slot, idx, signature, tx) - expected_items.append((slot, idx, signature, tx)) - - self.assertEqual(self.tx_receipts_storage.max_known_tx()[0], max_slot) - self.assertEqual(self.tx_receipts_storage.size(), num_items) - for item in expected_items: - self.assertTrue(self.tx_receipts_storage.contains(item[0], item[2])) - - def test_query(self): - """ - Test get_txs method works as expected - """ - self.tx_receipts_storage.clear() - self.assertEqual(self.tx_receipts_storage.size(), 0) - - max_slot = 50 - num_items = 100 - expected_items = [] - for idx in range(0, num_items): - slot, signature = self.create_slot_sig(max_slot) - trx = {'slot': slot, 'signature': signature} - self.tx_receipts_storage.add_tx(slot, idx, signature, trx) - expected_items.append((slot, idx, signature, trx)) - - start_slot = randint(0, 50) - - # query in ascending order - retrieved_txs = [item for item in self.tx_receipts_storage.get_txs(start_slot)] - self.assertGreaterEqual(retrieved_txs[0][0], start_slot) - self.assertLessEqual(retrieved_txs[-1][0], max_slot)