Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Decrease the storage usage for Solana receipts neonlabsorg/proxy-mode…
…l.py#764
  • Loading branch information
otselnik committed May 16, 2022
commit 15085501835b6fdf00dc3f77052bb4b0926fb966
3 changes: 1 addition & 2 deletions proxy/airdropper/airdropper.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,13 @@ def process_functions(self):
"""
Overrides IndexerBase.process_functions
"""
IndexerBase.process_functions(self)
self.debug("Process receipts")
self.process_receipts()
self.process_scheduled_trxs()

def process_receipts(self):
max_slot = 0
for slot, _, trx in self.transaction_receipts.get_txs(self.latest_processed_slot):
for slot, _, trx in self.get_tx_receipts():
max_slot = max(max_slot, slot)
if trx['transaction']['message']['instructions'] is not None:
self.process_trx_airdropper_mode(trx)
Expand Down
6 changes: 6 additions & 0 deletions proxy/db/scheme.sql
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,9 @@
tx BYTEA,
PRIMARY KEY (slot, signature)
);

CREATE TABLE IF NOT EXISTS solana_transaction_signatures (
slot BIGINT,
signature VARCHAR(88),
PRIMARY KEY (slot, signature)
);
4 changes: 1 addition & 3 deletions proxy/indexer/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,6 @@ def __init__(self, solana_url, indexer_user: IIndexerUser):

def process_functions(self):
self.block_indexer.gather_blocks()
IndexerBase.process_functions(self)
self.process_receipts()
self.canceller.unlock_accounts(self.blocked_storages)
self.blocked_storages = {}
Expand All @@ -915,7 +914,7 @@ def process_receipts(self):
max_slot = 1
while max_slot > 0:
max_slot = 0
for slot, sign, tx in self.transaction_receipts.get_txs(self.indexed_slot, last_block_slot):
for slot, sign, tx in self.get_tx_receipts(last_block_slot):
max_slot = max(max_slot, slot)

ix_info = SolanaIxInfo(sign=sign, slot=slot, tx=tx)
Expand Down Expand Up @@ -955,7 +954,6 @@ def process_receipts(self):
"processed slots": self.indexed_slot - start_indexed_slot
},
latest_params={
"transaction receipts len": self.transaction_receipts.size(),
"indexed slot": self.indexed_slot,
"min used slot": self.min_used_slot
}
Expand Down
122 changes: 58 additions & 64 deletions proxy/indexer/indexer_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import traceback
from multiprocessing.dummy import Pool as ThreadPool
from logged_groups import logged_group
from typing import List, Optional, Tuple
from typing import Dict, List, Optional, Union

from .trx_receipts_storage import TxReceiptsStorage
from .solana_signatures_db import SolanaSignatures
from .utils import MetricsToLogBuff
from ..common_neon.solana_interactor import SolanaInteractor
from ..indexer.sql_dict import SQLDict
Expand All @@ -20,18 +20,19 @@ def __init__(self,
solana: SolanaInteractor,
last_slot: int):
self.solana = solana
self.transaction_receipts = TxReceiptsStorage('solana_transaction_receipts')
self.solana_signatures = SolanaSignatures()
self.last_slot = self._init_last_slot('receipt', last_slot)
self.current_slot = 0
self.counter_ = 0
self.count_log = MetricsToLogBuff()
self._constants = SQLDict(tablename="constants")
self._maximum_tx = self._get_maximum_tx()
self._tx_receipts = {}

def _get_maximum_tx(self) -> str:
if "maximum_tx" in self._constants:
return self._constants["maximum_tx"]
return ""
return HISTORY_START

def _set_maximum_tx(self, tx: str):
self._maximum_tx = tx
Expand Down Expand Up @@ -90,109 +91,101 @@ def run(self):
def process_functions(self):
self.gather_unknown_transactions()

def gather_unknown_transactions(self):
start_time = time.time()
def get_tx_receipts(self, stop_slot=None):
signatures = self.gather_unknown_transactions()

poll_txs = []
tx_list = []
for signature in signatures:
if signature not in self._tx_receipts:
tx_list.append(signature)
if len(tx_list) >= 20:
poll_txs.append(tx_list)
poll_txs.append(tx_list)
self._get_txs(poll_txs)

minimal_tx = None
maximum_tx = None
maximum_slot = None
continue_flag = True
current_slot = self.solana.get_slot(commitment=FINALIZED)["result"]
tx_per_request = 20
for signature in signatures:
if signature not in self._tx_receipts:
self.error(f'{signature} receipt not found')
continue

tx = self._tx_receipts[signature]
slot = tx['slot']
if stop_slot and slot > stop_slot:
break
yield (slot, signature, tx)

self._set_maximum_tx(signature)
self.solana_signatures.remove_signature(signature)
del self._tx_receipts[signature]

def gather_unknown_transactions(self):
minimal_tx = self.solana_signatures.get_minimal_tx()
continue_flag = True
counter = 0
gathered_signatures = 0
tx_list = []
while continue_flag:
results = self._get_signatures(minimal_tx, 1000)
results = self._get_signatures(minimal_tx, self._maximum_tx, INDEXER_POLL_COUNT)
len_results = len(results)
if len_results == 0:
break

minimal_tx = results[-1]["signature"]
if maximum_tx is None:
tx = results[0]
maximum_tx = tx["signature"]
maximum_slot = tx["slot"]

gathered_signatures += len_results
counter += 1

tx_idx = 0
prev_slot = 0

for tx in results:
sol_sign = tx["signature"]
slot = tx["slot"]

if slot != prev_slot:
tx_idx = 0
prev_slot = slot

if slot < self.last_slot:
continue_flag = False
break

if sol_sign in [HISTORY_START, self._maximum_tx]:
continue_flag = False
break

tx_list.append((sol_sign, slot, tx_idx))
if len(tx_list) >= tx_per_request:
poll_txs.append(tx_list)
if len(tx_list) >= INDEXER_POLL_COUNT:
self.solana_signatures.add_signature(tx_list[0][0], tx_list[0][1])
tx_list = []
if len(poll_txs) >= INDEXER_POLL_COUNT / tx_per_request:
self._get_txs(poll_txs)

tx_idx += 1
tx_list.append((sol_sign, slot))

if len(tx_list) > 0:
poll_txs.append(tx_list)
if len(poll_txs) > 0:
self._get_txs(poll_txs)

self.current_slot = current_slot
self.counter_ = 0
self._set_maximum_tx(maximum_tx)
signatures = []
for signature, _ in reversed(tx_list):
signatures.append(signature)
return signatures

get_history_ms = (time.time() - start_time) * 1000 # convert this into milliseconds
self.count_log.print(
self.debug,
list_params={"get_history_ms": get_history_ms, "gathered_signatures": gathered_signatures, "counter": counter},
latest_params={"maximum_tx": maximum_tx, "maximum_slot": maximum_slot}
)

def _get_signatures(self, before: Optional[str], limit: int) -> []:
response = self.solana.get_signatures_for_address(before, limit, FINALIZED)
def _get_signatures(self, before: Optional[str], until: Optional[str], limit: int) -> List[Dict[str, Union[int, str]]]:
opts: Dict[str, Union[int, str]] = {}
if before is not None:
opts["before"] = before
if until is not None:
opts["until"] = until
opts["limit"] = limit
opts["commitment"] = FINALIZED
response = self.solana._send_rpc_request("getSignaturesForAddress", EVM_LOADER_ID, opts)
error = response.get('error')
result = response.get('result', [])
if error:
self.warning(f'Fail to get signatures: {error}')
return result

def _get_txs(self, poll_txs: List[List[Tuple[str, int, int]]]) -> None:
def _get_txs(self, poll_txs: List[List[str]]) -> None:
pool = ThreadPool(PARALLEL_REQUESTS)
pool.map(self._get_tx_receipts, poll_txs)
poll_txs.clear()

def _get_tx_receipts(self, full_list: List[Tuple[str, int, int]]) -> None:
sign_list = []
filtered_list = []
for sol_sign, slot, tx_idx in full_list:
if not self.transaction_receipts.contains(slot, sol_sign):
sign_list.append(sol_sign)
filtered_list.append((sol_sign, slot, tx_idx))
def _get_tx_receipts(self, sign_list: List[str]) -> None:
if len(sign_list) == 0:
return

retry = RETRY_ON_FAIL_ON_GETTING_CONFIRMED_TRANSACTION
while retry > 0:
try:
tx_list = self.solana.get_multiple_receipts(sign_list)
for tx_info, tx in zip(filtered_list, tx_list):
sol_sign, slot, tx_idx = tx_info
self._add_tx(sol_sign, tx, slot, tx_idx)
for sol_sign, tx in zip(sign_list, tx_list):
self._add_tx(sol_sign, tx)
retry = 0
except Exception as err:
retry -= 1
Expand All @@ -206,16 +199,17 @@ def _get_tx_receipts(self, full_list: List[Tuple[str, int, int]]) -> None:
if self.counter_ % 100 == 0:
self.debug(f"Acquired {self.counter_} receipts")

def _add_tx(self, sol_sign, tx, slot, tx_idx):
def _add_tx(self, sol_sign, tx):
if tx is not None:
add = False
msg = tx['transaction']['message']
slot = tx['slot']
for instruction in msg['instructions']:
if msg["accountKeys"][instruction["programIdIndex"]] == EVM_LOADER_ID:
add = True
if add:
self.debug(f'{(slot, tx_idx, sol_sign)}')
self.transaction_receipts.add_tx(slot, tx_idx, sol_sign, tx)
self.debug(f'{(slot, sol_sign)}')
self._tx_receipts[sol_sign] = tx
else:
self.debug(f"trx is None {sol_sign}")

26 changes: 26 additions & 0 deletions proxy/indexer/solana_signatures_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from ..indexer.base_db import BaseDB


class SolanaSignatures(BaseDB):
def __init__(self):
BaseDB.__init__(self, 'solana_transaction_signatures')

def add_signature(self, signature, slot):
with self._conn.cursor() as cursor:
cursor.execute(f'''
INSERT INTO solana_transaction_signatures
(slot, signature)
VALUES(%s, %s) ON CONFLICT DO NOTHING''',
(slot, signature))

def remove_signature(self, signature):
with self._conn.cursor() as cursor:
cursor.execute(f'DELETE FROM solana_transaction_signatures WHERE signature = %s', (signature,))

def get_minimal_tx(self):
with self._conn.cursor() as cursor:
cursor.execute(f'SELECT slot, signature FROM solana_transaction_signatures ORDER BY slot LIMIT 1')
row = cursor.fetchone()
if row is not None:
return row[1]
return None
77 changes: 0 additions & 77 deletions proxy/indexer/trx_receipts_storage.py

This file was deleted.

Loading