Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use new container in indexer and airdropper
  • Loading branch information
Ivan Loboda committed Dec 29, 2021
commit fe16a3f4315f27916f3681f7652a375418e9ca07
5 changes: 1 addition & 4 deletions proxy/indexer/airdropper.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,8 @@ def process_functions(self):

def process_receipts(self):
max_slot = 0
for slot_sig, trx in sorted(self.transaction_receipts.iteritems(), reverse=True):
slot, _signature = slot_sig
for slot, _, trx in self.transaction_receipts.get_trxs(self.latest_processed_slot, reverse=True):
max_slot = max(max_slot, slot)
if slot < self.latest_processed_slot:
break
if trx['transaction']['message']['instructions'] is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from ..common_neon.utils import get_from_dict
...
get_from_dict(trx, 'transaction', 'message', 'instructions')

self.process_trx_airdropper_mode(trx)
self.latest_processed_slot = max(self.latest_processed_slot, max_slot)
Expand Down
7 changes: 2 additions & 5 deletions proxy/indexer/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,11 @@ def process_receipts(self):
seen_slots = set()
max_slot = 0

for slot_sig, trx in sorted(self.transaction_receipts.iteritems(), reverse=True):
for slot_sig, trx in self.transaction_receipts.get_trxs(self.processed_slot, reverse=True):
slot, signature = slot_sig
max_slot = max(max_slot, slot)
counter += 1

if slot < self.processed_slot:
break

if signature in self.sol_eth_trx:
continue

Expand Down Expand Up @@ -445,7 +442,7 @@ def process_receipts(self):
self.processed_slot = max(self.processed_slot, max_slot)

process_receipts_ms = (time.time() - start_time)*1000 # convert this into milliseconds
logger.debug(f"process_receipts_ms: {process_receipts_ms} transaction_receipts.len: {len(self.transaction_receipts)} from {self.processed_slot} to {self.current_slot} slots")
logger.debug(f"process_receipts_ms: {process_receipts_ms} transaction_receipts.len: {self.transaction_receipts.size()} from {self.processed_slot} to {self.current_slot} slots")



Expand Down
18 changes: 7 additions & 11 deletions proxy/indexer/indexer_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

try:
from sql_dict import SQLDict
from trx_receipts_storage import TrxReceiptsStorage
except ImportError:
from .sql_dict import SQLDict
from .trx_receipts_storage import TrxReceiptsStorage


PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2"))
Expand Down Expand Up @@ -40,21 +42,16 @@ def __init__(self,

self.evm_loader_id = evm_loader_id
self.client = Client(solana_url)
self.transaction_receipts = SQLDict(tablename="known_transactions", bin_key=True)
self.transaction_receipts = TrxReceiptsStorage('transaction_receipts', log_level)
self.last_slot = start_slot
self.current_slot = 0
self.counter_ = 0

if len(self.transaction_receipts) > 0:
self.max_known_tx = max(self.transaction_receipts)
else:
self.max_known_tx = (0, None)

self.max_known_tx = self.transaction_receipts.max_known_trx()
self._move_data_from_old_table()


def _move_data_from_old_table(self):
if len(self.transaction_receipts) == 0:
if self.transaction_receipts.size() == 0:
transaction_receipts_old = SQLDict(tablename="known_transactions")
for signature, trx in transaction_receipts_old.iteritems():
self._add_trx(signature, trx)
Expand Down Expand Up @@ -100,7 +97,6 @@ def gather_unknown_transactions(self):
for tx in results:
solana_signature = tx["signature"]
slot = tx["slot"]
slot_sig = (slot, solana_signature)

if slot < self.last_slot:
continue_flag = False
Expand All @@ -111,7 +107,7 @@ def gather_unknown_transactions(self):
continue_flag = False
break

if slot_sig not in self.transaction_receipts:
if not self.transaction_receipts.contains(slot, solana_signature):
poll_txs.add(solana_signature)

logger.debug("start getting receipts")
Expand Down Expand Up @@ -161,7 +157,7 @@ def _add_trx(self, solana_signature, trx):
add = True
if add:
logger.debug((trx['slot'], solana_signature))
self.transaction_receipts[(trx['slot'], solana_signature)] = trx
self.transaction_receipts.add_trx(trx['slot'], solana_signature, trx)
else:
logger.debug(f"trx is None {solana_signature}")

30 changes: 2 additions & 28 deletions proxy/indexer/sql_dict.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,12 @@
import psycopg2
import os
import logging
from collections.abc import MutableMapping

POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db")
POSTGRES_USER = os.environ.get("POSTGRES_USER", "neon-proxy")
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "neon-proxy-pass")
POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")

try:
from cPickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL
except ImportError:
from pickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL

from proxy.indexer.pg_common import POSTGRES_DB, POSTGRES_USER, POSTGRES_PASSWORD\
, POSTGRES_HOST, encode, decode, dummy

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


def encode(obj):
"""Serialize an object using pickle to a binary format accepted by SQLite."""
return psycopg2.Binary(dumps(obj, protocol=PICKLE_PROTOCOL))


def decode(obj):
"""Deserialize objects retrieved from SQLite."""
return loads(bytes(obj))


def dummy(obj):
"""Does nothing"""
return obj


class SQLDict(MutableMapping):
"""Serialize an object using pickle to a binary format accepted by SQLite."""

Expand Down