Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions proxy/indexer/airdropper.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,8 @@ def process_functions(self):

def process_receipts(self):
max_slot = 0
for slot_sig, trx in sorted(self.transaction_receipts.iteritems(), reverse=True):
slot, _signature = slot_sig
for slot, _, trx in self.transaction_receipts.get_trxs(self.latest_processed_slot, reverse=True):
max_slot = max(max_slot, slot)
if slot < self.latest_processed_slot:
break
if trx['transaction']['message']['instructions'] is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from ..common_neon.utils import get_from_dict
...
get_from_dict(trx, 'transaction', 'message', 'instructions')

self.process_trx_airdropper_mode(trx)
self.latest_processed_slot = max(self.latest_processed_slot, max_slot)
Expand Down
8 changes: 2 additions & 6 deletions proxy/indexer/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,10 @@ def process_receipts(self):
seen_slots = set()
max_slot = 0

for slot_sig, trx in sorted(self.transaction_receipts.iteritems(), reverse=True):
slot, signature = slot_sig
for slot, signature, trx in self.transaction_receipts.get_trxs(self.processed_slot, reverse=True):
max_slot = max(max_slot, slot)
counter += 1

if slot < self.processed_slot:
break

if signature in self.sol_eth_trx:
continue

Expand Down Expand Up @@ -445,7 +441,7 @@ def process_receipts(self):
self.processed_slot = max(self.processed_slot, max_slot)

process_receipts_ms = (time.time() - start_time)*1000 # convert this into milliseconds
logger.debug(f"process_receipts_ms: {process_receipts_ms} transaction_receipts.len: {len(self.transaction_receipts)} from {self.processed_slot} to {self.current_slot} slots")
logger.debug(f"process_receipts_ms: {process_receipts_ms} transaction_receipts.len: {self.transaction_receipts.size()} from {self.processed_slot} to {self.current_slot} slots")



Expand Down
18 changes: 7 additions & 11 deletions proxy/indexer/indexer_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

try:
from sql_dict import SQLDict
from trx_receipts_storage import TrxReceiptsStorage
except ImportError:
from .sql_dict import SQLDict
from .trx_receipts_storage import TrxReceiptsStorage


PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2"))
Expand Down Expand Up @@ -40,21 +42,16 @@ def __init__(self,

self.evm_loader_id = evm_loader_id
self.client = Client(solana_url)
self.transaction_receipts = SQLDict(tablename="known_transactions", bin_key=True)
self.transaction_receipts = TrxReceiptsStorage('transaction_receipts', log_level)
self.last_slot = start_slot
self.current_slot = 0
self.counter_ = 0

if len(self.transaction_receipts) > 0:
self.max_known_tx = max(self.transaction_receipts)
else:
self.max_known_tx = (0, None)

self.max_known_tx = self.transaction_receipts.max_known_trx()
self._move_data_from_old_table()


def _move_data_from_old_table(self):
if len(self.transaction_receipts) == 0:
if self.transaction_receipts.size() == 0:
transaction_receipts_old = SQLDict(tablename="known_transactions")
for signature, trx in transaction_receipts_old.iteritems():
self._add_trx(signature, trx)
Expand Down Expand Up @@ -100,7 +97,6 @@ def gather_unknown_transactions(self):
for tx in results:
solana_signature = tx["signature"]
slot = tx["slot"]
slot_sig = (slot, solana_signature)

if slot < self.last_slot:
continue_flag = False
Expand All @@ -111,7 +107,7 @@ def gather_unknown_transactions(self):
continue_flag = False
break

if slot_sig not in self.transaction_receipts:
if not self.transaction_receipts.contains(slot, solana_signature):
poll_txs.add(solana_signature)

logger.debug("start getting receipts")
Expand Down Expand Up @@ -161,7 +157,7 @@ def _add_trx(self, solana_signature, trx):
add = True
if add:
logger.debug((trx['slot'], solana_signature))
self.transaction_receipts[(trx['slot'], solana_signature)] = trx
self.transaction_receipts.add_trx(trx['slot'], solana_signature, trx)
else:
logger.debug(f"trx is None {solana_signature}")

27 changes: 27 additions & 0 deletions proxy/indexer/pg_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import psycopg2
import os

POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db")
POSTGRES_USER = os.environ.get("POSTGRES_USER", "neon-proxy")
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "neon-proxy-pass")
POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")

try:
from cPickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL
except ImportError:
from pickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL


def encode(obj):
"""Serialize an object using pickle to a binary format accepted by SQLite."""
return psycopg2.Binary(dumps(obj, protocol=PICKLE_PROTOCOL))


def decode(obj):
"""Deserialize objects retrieved from SQLite."""
return loads(bytes(obj))


def dummy(obj):
"""Does nothing"""
return obj
30 changes: 2 additions & 28 deletions proxy/indexer/sql_dict.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,12 @@
import psycopg2
import os
import logging
from collections.abc import MutableMapping

POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db")
POSTGRES_USER = os.environ.get("POSTGRES_USER", "neon-proxy")
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "neon-proxy-pass")
POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")

try:
from cPickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL
except ImportError:
from pickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL

from proxy.indexer.pg_common import POSTGRES_DB, POSTGRES_USER, POSTGRES_PASSWORD\
, POSTGRES_HOST, encode, decode, dummy

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


def encode(obj):
"""Serialize an object using pickle to a binary format accepted by SQLite."""
return psycopg2.Binary(dumps(obj, protocol=PICKLE_PROTOCOL))


def decode(obj):
"""Deserialize objects retrieved from SQLite."""
return loads(bytes(obj))


def dummy(obj):
"""Does nothing"""
return obj


class SQLDict(MutableMapping):
"""Serialize an object using pickle to a binary format accepted by SQLite."""

Expand Down
74 changes: 74 additions & 0 deletions proxy/indexer/trx_receipts_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import psycopg2
import os
import logging
from proxy.indexer.pg_common import POSTGRES_DB, POSTGRES_USER, POSTGRES_PASSWORD\
, POSTGRES_HOST, encode, decode, dummy

logger = logging.getLogger(__name__)

class TrxReceiptsStorage:
def __init__(self, table_name, log_level = logging.DEBUG):
self.table_name = table_name
logger.setLevel(log_level)
self.conn = psycopg2.connect(
dbname=POSTGRES_DB,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
host=POSTGRES_HOST
)

self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = self.conn.cursor()
cur.execute(f'''
CREATE TABLE IF NOT EXISTS
{self.table_name} (
slot BIGINT,
signature VARCHAR(88),
trx BYTEA,
PRIMARY KEY (slot, signature)
)
''')

def clear(self):
cur = self.conn.cursor()
cur.execute(f'DELETE FROM {self.table_name}')

def size(self):
cur = self.conn.cursor()
cur.execute(f'SELECT COUNT(*) FROM {self.table_name}')
rows = cur.fetchone()[0]
return rows if rows is not None else 0

def max_known_trx(self):
cur = self.conn.cursor()
cur.execute(f'SELECT slot, signature FROM {self.table_name} ORDER BY slot DESC, signature DESC LIMIT 1')
row = cur.fetchone()
if row is not None:
return (row[0], row[1])
return (0, None) #table empty - return default value

def add_trx(self, slot, signature, trx):
bin_trx = encode(trx)
cur = self.conn.cursor()
cur.execute(f'''
INSERT INTO {self.table_name} (slot, signature, trx)
VALUES ({slot},%s,%s)
ON CONFLICT (slot, signature)
DO UPDATE SET
trx = EXCLUDED.trx
''',
(signature, bin_trx)
)

def contains(self, slot, signature):
cur = self.conn.cursor()
cur.execute(f'SELECT 1 FROM {self.table_name} WHERE slot = %s AND signature = %s', (slot, signature,))
return cur.fetchone() is not None

def get_trxs(self, start_slot = 0, reverse = False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annotations?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need extra spaces around = according to PEP8

def get_trxs(self, start_slot=0, reverse=False):

cur = self.conn.cursor()
order = 'DESC' if reverse else 'ASC'
cur.execute(f'SELECT slot, signature, trx FROM {self.table_name} WHERE slot >= {start_slot} ORDER BY slot {order}')
rows = cur.fetchall()
for row in rows:
yield row[0], row[1], decode(row[2])
68 changes: 68 additions & 0 deletions proxy/testing/test_trx_receipts_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from unittest import TestCase
from proxy.indexer.trx_receipts_storage import TrxReceiptsStorage
from random import randint
from base58 import b58encode


class TestTrxReceiptsStorage(TestCase):
@classmethod
def setUpClass(cls) -> None:
print("\n\nhttps://github.com/neonlabsorg/proxy-model.py/issues/421")
cls.testee = TrxReceiptsStorage('test_storage')

def create_signature(self):
signature = b''
for i in range(0, 5):
signature += randint(0, 255).to_bytes(1, byteorder='big')
return b58encode(signature).decode("utf-8")

def create_slot_sig(self, max_slot):
slot = randint(0, max_slot)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

random slot makes test fail "randomly"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

======================================================================
FAIL: test_query (proxy.testing.test_trx_receipts_storage.TestTrxReceiptsStorage)
Test get_trxs method workds as expected
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/opt/proxy/testing/test_trx_receipts_storage.py", line 63, in test_query
    self.assertEqual(retrieved_trxs[0][0], start_slot)
AssertionError: 35 != 34

----------------------------------------------------------------------

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! This is due to probabilistic reasons: sometimes generated data not contains any transaction for given slot. In average, it will exist only two transaction for every given slot with current generation parameters (100 trxs in 50 slots randomly), but in reality there a cases when there are no transactions for given slot at all. Actually, testing expression must cover such cases, so I changed it.

return (slot, self.create_signature())

def test_data_consistency(self):
"""
Test that data put into container is stored there
"""
self.testee.clear()
self.assertEqual(self.testee.size(), 0)
self.assertEqual(self.testee.max_known_trx(), (0, None))

max_slot = 10
num_items = 100
expected_items = []
for _ in range(0, num_items):
slot, signature = self.create_slot_sig(max_slot)
trx = { 'slot': slot, 'signature': signature }
self.testee.add_trx(slot, signature, trx)
expected_items.append((slot, signature, trx))

self.assertEqual(self.testee.max_known_trx()[0], max_slot)
self.assertEqual(self.testee.size(), num_items)
for item in expected_items:
self.assertTrue(self.testee.contains(item[0], item[1]))

def test_query(self):
"""
Test get_trxs method workds as expected
"""
self.testee.clear()
self.assertEqual(self.testee.size(), 0)

max_slot = 50
num_items = 100
expected_items = []
for _ in range(0, num_items):
slot, signature = self.create_slot_sig(max_slot)
trx = { 'slot': slot, 'signature': signature }
self.testee.add_trx(slot, signature, trx)
expected_items.append((slot, signature, trx))

start_slot = 34
retrieved_trxs = [item for item in self.testee.get_trxs(start_slot, False)]
self.assertEqual(retrieved_trxs[0][0], start_slot)
self.assertEqual(retrieved_trxs[-1][0], max_slot)

retrieved_trxs = [item for item in self.testee.get_trxs(start_slot, True)]
self.assertEqual(retrieved_trxs[0][0], max_slot)
self.assertEqual(retrieved_trxs[-1][0], start_slot)