Skip to content
Prev Previous commit
421 optimise indexer transaction receipt storage (#423)
* Create TrxReceiptsStorage

* Use new container in indexer and airdropper

* Fix bug

* Fix assertions in tests, use random start_slot

Co-authored-by: Ivan Loboda <[email protected]>
  • Loading branch information
ivandzen and Ivan Loboda authored Dec 30, 2021
commit 799285c5b2aae03f4e11c83f119ccc70139b909b
5 changes: 1 addition & 4 deletions proxy/indexer/airdropper.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,8 @@ def process_functions(self):

def process_receipts(self):
max_slot = 0
for slot_sig, trx in sorted(self.transaction_receipts.iteritems(), reverse=True):
slot, _signature = slot_sig
for slot, _, trx in self.transaction_receipts.get_trxs(self.latest_processed_slot, reverse=True):
max_slot = max(max_slot, slot)
if slot < self.latest_processed_slot:
break
if trx['transaction']['message']['instructions'] is not None:
self.process_trx_airdropper_mode(trx)
self.latest_processed_slot = max(self.latest_processed_slot, max_slot)
Expand Down
8 changes: 2 additions & 6 deletions proxy/indexer/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,10 @@ def process_receipts(self):
seen_slots = set()
max_slot = 0

for slot_sig, trx in sorted(self.transaction_receipts.iteritems(), reverse=True):
slot, signature = slot_sig
for slot, signature, trx in self.transaction_receipts.get_trxs(self.processed_slot, reverse=True):
max_slot = max(max_slot, slot)
counter += 1

if slot < self.processed_slot:
break

if signature in self.sol_eth_trx:
continue

Expand Down Expand Up @@ -445,7 +441,7 @@ def process_receipts(self):
self.processed_slot = max(self.processed_slot, max_slot)

process_receipts_ms = (time.time() - start_time)*1000 # convert this into milliseconds
logger.debug(f"process_receipts_ms: {process_receipts_ms} transaction_receipts.len: {len(self.transaction_receipts)} from {self.processed_slot} to {self.current_slot} slots")
logger.debug(f"process_receipts_ms: {process_receipts_ms} transaction_receipts.len: {self.transaction_receipts.size()} from {self.processed_slot} to {self.current_slot} slots")



Expand Down
18 changes: 7 additions & 11 deletions proxy/indexer/indexer_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

try:
from sql_dict import SQLDict
from trx_receipts_storage import TrxReceiptsStorage
except ImportError:
from .sql_dict import SQLDict
from .trx_receipts_storage import TrxReceiptsStorage


PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2"))
Expand Down Expand Up @@ -40,21 +42,16 @@ def __init__(self,

self.evm_loader_id = evm_loader_id
self.client = Client(solana_url)
self.transaction_receipts = SQLDict(tablename="known_transactions", bin_key=True)
self.transaction_receipts = TrxReceiptsStorage('transaction_receipts', log_level)
self.last_slot = start_slot
self.current_slot = 0
self.counter_ = 0

if len(self.transaction_receipts) > 0:
self.max_known_tx = max(self.transaction_receipts)
else:
self.max_known_tx = (0, None)

self.max_known_tx = self.transaction_receipts.max_known_trx()
self._move_data_from_old_table()


def _move_data_from_old_table(self):
if len(self.transaction_receipts) == 0:
if self.transaction_receipts.size() == 0:
transaction_receipts_old = SQLDict(tablename="known_transactions")
for signature, trx in transaction_receipts_old.iteritems():
self._add_trx(signature, trx)
Expand Down Expand Up @@ -100,7 +97,6 @@ def gather_unknown_transactions(self):
for tx in results:
solana_signature = tx["signature"]
slot = tx["slot"]
slot_sig = (slot, solana_signature)

if slot < self.last_slot:
continue_flag = False
Expand All @@ -111,7 +107,7 @@ def gather_unknown_transactions(self):
continue_flag = False
break

if slot_sig not in self.transaction_receipts:
if not self.transaction_receipts.contains(slot, solana_signature):
poll_txs.add(solana_signature)

logger.debug("start getting receipts")
Expand Down Expand Up @@ -161,7 +157,7 @@ def _add_trx(self, solana_signature, trx):
add = True
if add:
logger.debug((trx['slot'], solana_signature))
self.transaction_receipts[(trx['slot'], solana_signature)] = trx
self.transaction_receipts.add_trx(trx['slot'], solana_signature, trx)
else:
logger.debug(f"trx is None {solana_signature}")

27 changes: 27 additions & 0 deletions proxy/indexer/pg_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import psycopg2
import os

POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db")
POSTGRES_USER = os.environ.get("POSTGRES_USER", "neon-proxy")
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "neon-proxy-pass")
POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")

try:
from cPickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL
except ImportError:
from pickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL


def encode(obj):
"""Serialize an object using pickle to a binary format accepted by SQLite."""
return psycopg2.Binary(dumps(obj, protocol=PICKLE_PROTOCOL))


def decode(obj):
"""Deserialize objects retrieved from SQLite."""
return loads(bytes(obj))


def dummy(obj):
"""Does nothing"""
return obj
30 changes: 2 additions & 28 deletions proxy/indexer/sql_dict.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,12 @@
import psycopg2
import os
import logging
from collections.abc import MutableMapping

POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db")
POSTGRES_USER = os.environ.get("POSTGRES_USER", "neon-proxy")
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "neon-proxy-pass")
POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")

try:
from cPickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL
except ImportError:
from pickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL

from proxy.indexer.pg_common import POSTGRES_DB, POSTGRES_USER, POSTGRES_PASSWORD\
, POSTGRES_HOST, encode, decode, dummy

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


def encode(obj):
"""Serialize an object using pickle to a binary format accepted by SQLite."""
return psycopg2.Binary(dumps(obj, protocol=PICKLE_PROTOCOL))


def decode(obj):
"""Deserialize objects retrieved from SQLite."""
return loads(bytes(obj))


def dummy(obj):
"""Does nothing"""
return obj


class SQLDict(MutableMapping):
"""Serialize an object using pickle to a binary format accepted by SQLite."""

Expand Down
74 changes: 74 additions & 0 deletions proxy/indexer/trx_receipts_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import psycopg2
import os
import logging
from proxy.indexer.pg_common import POSTGRES_DB, POSTGRES_USER, POSTGRES_PASSWORD\
, POSTGRES_HOST, encode, decode, dummy

logger = logging.getLogger(__name__)

class TrxReceiptsStorage:
def __init__(self, table_name, log_level = logging.DEBUG):
self.table_name = table_name
logger.setLevel(log_level)
self.conn = psycopg2.connect(
dbname=POSTGRES_DB,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
host=POSTGRES_HOST
)

self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = self.conn.cursor()
cur.execute(f'''
CREATE TABLE IF NOT EXISTS
{self.table_name} (
slot BIGINT,
signature VARCHAR(88),
trx BYTEA,
PRIMARY KEY (slot, signature)
)
''')

def clear(self):
cur = self.conn.cursor()
cur.execute(f'DELETE FROM {self.table_name}')

def size(self):
cur = self.conn.cursor()
cur.execute(f'SELECT COUNT(*) FROM {self.table_name}')
rows = cur.fetchone()[0]
return rows if rows is not None else 0

def max_known_trx(self):
cur = self.conn.cursor()
cur.execute(f'SELECT slot, signature FROM {self.table_name} ORDER BY slot DESC, signature DESC LIMIT 1')
row = cur.fetchone()
if row is not None:
return (row[0], row[1])
return (0, None) #table empty - return default value

def add_trx(self, slot, signature, trx):
bin_trx = encode(trx)
cur = self.conn.cursor()
cur.execute(f'''
INSERT INTO {self.table_name} (slot, signature, trx)
VALUES ({slot},%s,%s)
ON CONFLICT (slot, signature)
DO UPDATE SET
trx = EXCLUDED.trx
''',
(signature, bin_trx)
)

def contains(self, slot, signature):
cur = self.conn.cursor()
cur.execute(f'SELECT 1 FROM {self.table_name} WHERE slot = %s AND signature = %s', (slot, signature,))
return cur.fetchone() is not None

def get_trxs(self, start_slot = 0, reverse = False):
cur = self.conn.cursor()
order = 'DESC' if reverse else 'ASC'
cur.execute(f'SELECT slot, signature, trx FROM {self.table_name} WHERE slot >= {start_slot} ORDER BY slot {order}')
rows = cur.fetchall()
for row in rows:
yield row[0], row[1], decode(row[2])
71 changes: 71 additions & 0 deletions proxy/testing/test_trx_receipts_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from unittest import TestCase
from proxy.indexer.trx_receipts_storage import TrxReceiptsStorage
from random import randint
from base58 import b58encode


class TestTrxReceiptsStorage(TestCase):
@classmethod
def setUpClass(cls) -> None:
print("\n\nhttps://github.com/neonlabsorg/proxy-model.py/issues/421")
cls.testee = TrxReceiptsStorage('test_storage')

def create_signature(self):
signature = b''
for i in range(0, 5):
signature += randint(0, 255).to_bytes(1, byteorder='big')
return b58encode(signature).decode("utf-8")

def create_slot_sig(self, max_slot):
slot = randint(0, max_slot)
return (slot, self.create_signature())

def test_data_consistency(self):
"""
Test that data put into container is stored there
"""
self.testee.clear()
self.assertEqual(self.testee.size(), 0)
self.assertEqual(self.testee.max_known_trx(), (0, None))

max_slot = 10
num_items = 100
expected_items = []
for _ in range(0, num_items):
slot, signature = self.create_slot_sig(max_slot)
trx = { 'slot': slot, 'signature': signature }
self.testee.add_trx(slot, signature, trx)
expected_items.append((slot, signature, trx))

self.assertEqual(self.testee.max_known_trx()[0], max_slot)
self.assertEqual(self.testee.size(), num_items)
for item in expected_items:
self.assertTrue(self.testee.contains(item[0], item[1]))

def test_query(self):
"""
Test get_trxs method workds as expected
"""
self.testee.clear()
self.assertEqual(self.testee.size(), 0)

max_slot = 50
num_items = 100
expected_items = []
for _ in range(0, num_items):
slot, signature = self.create_slot_sig(max_slot)
trx = { 'slot': slot, 'signature': signature }
self.testee.add_trx(slot, signature, trx)
expected_items.append((slot, signature, trx))

start_slot = randint(0, 50)

# query in ascending order
retrieved_trxs = [item for item in self.testee.get_trxs(start_slot, False)]
self.assertGreaterEqual(retrieved_trxs[0][0], start_slot)
self.assertLessEqual(retrieved_trxs[-1][0], max_slot)

# query in descending order
retrieved_trxs = [item for item in self.testee.get_trxs(start_slot, True)]
self.assertLessEqual(retrieved_trxs[0][0], max_slot)
self.assertGreaterEqual(retrieved_trxs[-1][0], start_slot)