Skip to content
Merged
2 changes: 2 additions & 0 deletions .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ steps:
- "solana.log"
- "measurements.log"
- "evm_loader.log"
- "faucet.log"
- "airdropper.log"

- wait

Expand Down
23 changes: 8 additions & 15 deletions proxy/indexer/airdropper.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def __init__(self,
neon_decimals = 9,
start_slot = 0):
IndexerBase.__init__(self, solana_url, evm_loader_id, log_level, start_slot)
self.latest_processed_slot = 0

# collection of eth-address-to-create-accout-trx mappings
# for every addresses that was already funded with airdrop
Expand Down Expand Up @@ -95,7 +96,7 @@ def _airdrop_to(self, create_acc):
if not resp.ok:
logger.warning(f'Failed to airdrop: {resp.status_code}')
return

self.airdrop_ready[eth_address] = create_acc


Expand Down Expand Up @@ -143,20 +144,12 @@ def process_functions(self):


def process_receipts(self):
counter = 0
for signature in self.transaction_order:
counter += 1
if signature in self.transaction_receipts:
trx = self.transaction_receipts[signature]
if trx is None:
logger.error("trx is None")
del self.transaction_receipts[signature]
continue
if 'slot' not in trx:
logger.debug("\n{}".format(json.dumps(trx, indent=4, sort_keys=True)))
exit()
if trx['transaction']['message']['instructions'] is not None:
self.process_trx_airdropper_mode(trx)
max_slot = 0
for slot, _, trx in self.transaction_receipts.get_trxs(self.latest_processed_slot, reverse=True):
max_slot = max(max_slot, slot)
if trx['transaction']['message']['instructions'] is not None:
self.process_trx_airdropper_mode(trx)
self.latest_processed_slot = max(self.latest_processed_slot, max_slot)


def run_airdropper(solana_url,
Expand Down
552 changes: 288 additions & 264 deletions proxy/indexer/indexer.py

Large diffs are not rendered by default.

103 changes: 58 additions & 45 deletions proxy/indexer/indexer_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

try:
from sql_dict import SQLDict
from trx_receipts_storage import TrxReceiptsStorage
except ImportError:
from .sql_dict import SQLDict
from .trx_receipts_storage import TrxReceiptsStorage


PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2"))
Expand Down Expand Up @@ -40,11 +42,19 @@ def __init__(self,

self.evm_loader_id = evm_loader_id
self.client = Client(solana_url)
self.transaction_receipts = SQLDict(tablename="known_transactions")
self.transaction_receipts = TrxReceiptsStorage('transaction_receipts', log_level)
self.last_slot = start_slot
self.current_slot = 0
self.transaction_order = []
self.counter_ = 0
self.max_known_tx = self.transaction_receipts.max_known_trx()
self._move_data_from_old_table()


def _move_data_from_old_table(self):
if self.transaction_receipts.size() == 0:
transaction_receipts_old = SQLDict(tablename="known_transactions")
for signature, trx in transaction_receipts_old.iteritems():
self._add_trx(signature, trx)


def run(self):
Expand All @@ -53,6 +63,7 @@ def run(self):
self.process_functions()
except Exception as err:
logger.warning("Got exception while indexing. Type(err):%s, Exception:%s", type(err), err)
time.sleep(1.0)


def process_functions(self):
Expand All @@ -62,82 +73,72 @@ def process_functions(self):

def gather_unknown_transactions(self):
poll_txs = set()
ordered_txs = []

minimal_tx = None
continue_flag = True
current_slot = self.client.get_slot(commitment="confirmed")["result"]
maximum_slot = self.last_slot
minimal_slot = current_slot

max_known_tx = self.max_known_tx

counter = 0
while (continue_flag):
opts: Dict[str, Union[int, str]] = {}
if minimal_tx:
opts["before"] = minimal_tx
opts["commitment"] = "confirmed"
result = self.client._provider.make_request("getSignaturesForAddress", self.evm_loader_id, opts)
logger.debug("{:>3} get_signatures_for_address {}".format(counter, len(result["result"])))
results = self._get_signatures(minimal_tx, self.max_known_tx[1])
logger.debug("{:>3} get_signatures_for_address {}".format(counter, len(results)))
counter += 1

if len(result["result"]) == 0:
logger.debug("len(result['result']) == 0")
if len(results) == 0:
logger.debug("len(results) == 0")
break

for tx in result["result"]:
minimal_tx = results[-1]["signature"]
max_tx = (results[0]["slot"], results[0]["signature"])
max_known_tx = max(max_known_tx, max_tx)

for tx in results:
solana_signature = tx["signature"]
slot = tx["slot"]

if slot < self.last_slot:
continue_flag = False
break

if solana_signature in HISTORY_START:
logger.debug(solana_signature)
continue_flag = False
break

ordered_txs.append(solana_signature)

if solana_signature not in self.transaction_receipts:
if not self.transaction_receipts.contains(slot, solana_signature):
poll_txs.add(solana_signature)

if slot < minimal_slot:
minimal_slot = slot
minimal_tx = solana_signature

if slot > maximum_slot:
maximum_slot = slot

if slot < self.last_slot:
continue_flag = False
break

logger.debug("start getting receipts")
pool = ThreadPool(PARALLEL_REQUESTS)
pool.map(self.get_tx_receipts, poll_txs)

if len(self.transaction_order):
index = 0
try:
index = ordered_txs.index(self.transaction_order[0])
except ValueError:
self.transaction_order = ordered_txs + self.transaction_order
else:
self.transaction_order = ordered_txs[:index] + self.transaction_order
else:
self.transaction_order = ordered_txs
pool.map(self._get_tx_receipts, poll_txs)

self.last_slot = maximum_slot
self.current_slot = current_slot

self.counter_ = 0
logger.debug(max_known_tx)
self.max_known_tx = max_known_tx


def _get_signatures(self, before, until):
opts: Dict[str, Union[int, str]] = {}
if until is not None:
opts["until"] = until
if before is not None:
opts["before"] = before
opts["commitment"] = "confirmed"
result = self.client._provider.make_request("getSignaturesForAddress", self.evm_loader_id, opts)
return result['result']


def get_tx_receipts(self, solana_signature):
def _get_tx_receipts(self, solana_signature):
# trx = None
retry = True

while retry:
try:
trx = self.client.get_confirmed_transaction(solana_signature)['result']
self.transaction_receipts[solana_signature] = trx
self._add_trx(solana_signature, trx)
retry = False
except Exception as err:
logger.debug(err)
Expand All @@ -147,4 +148,16 @@ def get_tx_receipts(self, solana_signature):
if self.counter_ % 100 == 0:
logger.debug(self.counter_)

# return (solana_signature, trx)

def _add_trx(self, solana_signature, trx):
if trx is not None:
add = False
for instruction in trx['transaction']['message']['instructions']:
if trx["transaction"]["message"]["accountKeys"][instruction["programIdIndex"]] == self.evm_loader_id:
add = True
if add:
logger.debug((trx['slot'], solana_signature))
self.transaction_receipts.add_trx(trx['slot'], solana_signature, trx)
else:
logger.debug(f"trx is None {solana_signature}")

27 changes: 27 additions & 0 deletions proxy/indexer/pg_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import psycopg2
import os

POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db")
POSTGRES_USER = os.environ.get("POSTGRES_USER", "neon-proxy")
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "neon-proxy-pass")
POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")

try:
from cPickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL
except ImportError:
from pickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL


def encode(obj):
"""Serialize an object using pickle to a binary format accepted by SQLite."""
return psycopg2.Binary(dumps(obj, protocol=PICKLE_PROTOCOL))


def decode(obj):
"""Deserialize objects retrieved from SQLite."""
return loads(bytes(obj))


def dummy(obj):
"""Does nothing"""
return obj
Loading