Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion proxy/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
neon_decimals = int(os.environ.get('NEON_DECIMALS', '9'))

start_slot = os.environ.get('START_SLOT', None)
finalized = os.environ.get('FINALIZED', 'finalized')
if start_slot == 'LATEST':
client = Client(solana_url)
start_slot = client.get_slot(commitment="confirmed")["result"]
start_slot = client.get_slot(commitment=finalized)["result"]
if start_slot is None: # by default
start_slot = 0
else: # try to convert into integer
Expand Down
22 changes: 14 additions & 8 deletions proxy/indexer/blocks_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ def _create_table_sql(self) -> str:
blocktime BIGINT,
signatures BYTEA,

UNIQUE(slot)
UNIQUE(slot, finalized)
);
CREATE INDEX IF NOT EXISTS {self._table_name}_hash ON {self._table_name}(hash);
CREATE INDEX IF NOT EXISTS {self._table_name}_height ON {self._table_name}(height);
CREATE INDEX IF NOT EXISTS {self._table_name}_hash ON {self._table_name}(hash, finalized);
CREATE INDEX IF NOT EXISTS {self._table_name}_height ON {self._table_name}(height, finalized);
"""

def _block_from_value(self, value, slot=None) -> SolanaBlockDBInfo:
Expand Down Expand Up @@ -66,16 +66,22 @@ def _full_block_from_value(self, value, slot=None) -> SolanaBlockDBInfo:
)

def get_block_by_slot(self, block_slot) -> SolanaBlockDBInfo:
return self._block_from_value(self._fetchone(self._column_lst, [('slot', block_slot)]), block_slot)
return self._block_from_value(
self._fetchone(self._column_lst, [('slot', block_slot)], ['finalized desc']),
block_slot)

def get_full_block_by_slot(self, block_slot) -> SolanaBlockDBInfo:
return self._full_block_from_value(self._fetchone(self._full_column_lst, [('slot', block_slot)]), block_slot)
return self._full_block_from_value(
self._fetchone(self._full_column_lst, [('slot', block_slot)], ['finalized desc']),
block_slot)

def get_block_by_hash(self, block_hash) -> SolanaBlockDBInfo:
return self._block_from_value(self._fetchone(self._column_lst, [('hash', block_hash)]))
return self._block_from_value(
self._fetchone(self._column_lst, [('hash', block_hash)], ['finalized desc']))

def get_block_by_height(self, block_num) -> SolanaBlockDBInfo:
return self._block_from_value(self._fetchone(self._column_lst, [('height', block_num)]))
return self._block_from_value(
self._fetchone(self._column_lst, [('height', block_num)], ['finalized desc']))

def set_block(self, block: SolanaBlockDBInfo):
cursor = self._conn.cursor()
Expand All @@ -84,7 +90,7 @@ def set_block(self, block: SolanaBlockDBInfo):
({', '.join(self._full_column_lst)})
VALUES
({', '.join(['%s' for _ in range(len(self._full_column_lst))])})
ON CONFLICT (slot) DO UPDATE SET
ON CONFLICT (slot, finalized) DO UPDATE SET
hash=EXCLUDED.hash,
height=EXCLUDED.height,
parent_hash=EXCLUDED.parent_hash,
Expand Down
19 changes: 10 additions & 9 deletions proxy/indexer/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
try:
from indexer_base import logger, IndexerBase, PARALLEL_REQUESTS
from indexer_db import IndexerDB
from utils import SolanaIxSignInfo, NeonTxResultInfo, NeonTxSignInfo, Canceller, str_fmt_object
from utils import SolanaIxSignInfo, NeonTxResultInfo, NeonTxSignInfo, Canceller, str_fmt_object, FINALIZED
except ImportError:
from .indexer_base import logger, IndexerBase, PARALLEL_REQUESTS
from .indexer_db import IndexerDB
from .utils import SolanaIxSignInfo, NeonTxResultInfo, NeonTxInfo, Canceller, str_fmt_object
from .indexer_db import IndexerDB, FINALIZED
from .utils import SolanaIxSignInfo, NeonTxResultInfo, NeonTxInfo, Canceller, str_fmt_object, FINALIZED

from ..environment import EVM_LOADER_ID

Expand Down Expand Up @@ -625,6 +625,7 @@ def __init__(self,
self.blocked_storages = {}
self.processed_slot = self.db.get_min_receipt_slot()
logger.debug(f'Minimum receipt slot: {self.processed_slot}')
logger.debug(f'Finalized commitment: {FINALIZED}')

self.state = ReceiptsParserState(db=self.db, client=self.client)
self.ix_decoder_map = {
Expand Down Expand Up @@ -653,9 +654,11 @@ def __init__(self,
def process_functions(self):
IndexerBase.process_functions(self)
logger.debug("Start getting blocks")
self.gather_blocks()
(start_block_slot, last_block_slot) = self.gather_blocks()
logger.debug("Process receipts")
self.process_receipts()
logger.debug(f'remove not finalized data in range[{start_block_slot}..{last_block_slot}]')
self.db.del_not_finalized(from_slot=start_block_slot, to_slot=last_block_slot)
logger.debug("Unlock accounts")
self.canceller.unlock_accounts(self.blocked_storages)
self.blocked_storages = {}
Expand Down Expand Up @@ -702,8 +705,8 @@ def gather_blocks(self):
height = -1
confirmed_blocks_len = 10000
client = self.client._provider
list_opts = {"commitment": "finalized"}
block_opts = {"commitment": "finalized", "transactionDetails": "none", "rewards": False}
list_opts = {"commitment": FINALIZED}
block_opts = {"commitment": FINALIZED, "transactionDetails": "none", "rewards": False}
while confirmed_blocks_len == 10000:
confirmed_blocks = client.make_request("getBlocksWithLimit", last_block_slot, confirmed_blocks_len, list_opts)['result']
confirmed_blocks_len = len(confirmed_blocks)
Expand All @@ -725,16 +728,14 @@ def gather_blocks(self):
break

# Everything is good
logger.debug(f'remove not finalized data in range[{start_block_slot}..{last_block_slot}]')
self.db.del_not_finalized(from_slot=start_block_slot, to_slot=last_block_slot)
start_block_slot = last_block_slot
logger.debug(f"gather_blocks from {height} to {max_height}")
self.db.fill_block_height(height, confirmed_blocks)
self.db.set_last_slot_height(last_block_slot, max_height)
height = max_height

gather_blocks_ms = (time.time() - start_time) * 1000 # convert this into milliseconds
logger.debug(f"gather_blocks_ms: {gather_blocks_ms} last_height: {max_height} last_block_slot {last_block_slot}")
return start_block_slot, last_block_slot


def run_indexer(solana_url,
Expand Down
6 changes: 4 additions & 2 deletions proxy/indexer/indexer_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
try:
from sql_dict import SQLDict
from trx_receipts_storage import TrxReceiptsStorage
from utils import FINALIZED
except ImportError:
from .sql_dict import SQLDict
from .trx_receipts_storage import TrxReceiptsStorage
from .utils import FINALIZED


PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2"))
Expand Down Expand Up @@ -79,7 +81,7 @@ def gather_unknown_transactions(self):

minimal_tx = None
continue_flag = True
current_slot = self.client.get_slot(commitment="finalized")["result"]
current_slot = self.client.get_slot(commitment=FINALIZED)["result"]

max_known_tx = self.max_known_tx

Expand Down Expand Up @@ -129,7 +131,7 @@ def _get_signatures(self, before, until):
opts["until"] = until
if before is not None:
opts["before"] = before
opts["commitment"] = "finalized"
opts["commitment"] = FINALIZED
result = self.client._provider.make_request("getSignaturesForAddress", self.evm_loader_id, opts)
return result['result']

Expand Down
5 changes: 3 additions & 2 deletions proxy/indexer/indexer_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
import traceback

try:
from utils import LogDB, NeonTxInfo, NeonTxResultInfo, SolanaIxSignInfo
from utils import LogDB, NeonTxInfo, NeonTxResultInfo, SolanaIxSignInfo, FINALIZED
from blocks_db import SolanaBlocksDB, SolanaBlockDBInfo
from transactions_db import NeonTxsDB, NeonTxDBInfo
from sql_dict import SQLDict
except ImportError:
from .utils import LogDB, NeonTxInfo, NeonTxResultInfo, SolanaIxSignInfo
from .utils import LogDB, NeonTxInfo, NeonTxResultInfo, SolanaIxSignInfo, FINALIZED
from .blocks_db import SolanaBlocksDB, SolanaBlockDBInfo
from .transactions_db import NeonTxsDB, NeonTxDBInfo
from .sql_dict import SQLDict
Expand Down Expand Up @@ -61,6 +61,7 @@ def _fill_block_from_net(self, block: SolanaBlockDBInfo):
block.signs = net_block['signatures']
block.parent_hash = '0x' + base58.b58decode(net_block['previousBlockhash']).hex()
block.time = net_block['blockTime']
block.finalized = ("confirmed" == FINALIZED)
logger.debug(f'{block}')
self._blocks_db.set_block(block)
return block
Expand Down
8 changes: 5 additions & 3 deletions proxy/indexer/transactions_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def _create_table_sql(self) -> str:
calldata TEXT,
logs BYTEA,

UNIQUE(neon_sign),
UNIQUE(neon_sign, finalized),
UNIQUE(sol_sign, idx)
);
CREATE INDEX IF NOT EXISTS {self._table_name}_finalized ON {self._table_name}(slot, finalized);
Expand Down Expand Up @@ -152,7 +152,9 @@ def del_not_finalized(self, from_slot: int, to_slot: int):
(from_slot, to_slot))

def get_tx_by_neon_sign(self, neon_sign) -> NeonTxDBInfo:
return self._tx_from_value(self._fetchone(self._column_lst, [('neon_sign', neon_sign)]))
return self._tx_from_value(
self._fetchone(self._column_lst, [('neon_sign', neon_sign)], ['finalized desc']))

def get_tx_by_sol_sign(self, sol_sign) -> NeonTxDBInfo:
return self._tx_from_value(self._fetchone(self._column_lst, [('sol_sign', sol_sign)]))
return self._tx_from_value(
self._fetchone(self._column_lst, [('sol_sign', sol_sign)], ['finalized desc']))
14 changes: 10 additions & 4 deletions proxy/indexer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import psycopg2
import rlp
import subprocess
import os

from eth_utils import big_endian_to_int
from solana.account import Account
Expand All @@ -29,6 +30,8 @@
from proxy.indexer.pg_common import encode, decode


FINALIZED = os.environ.get('FINALIZED', 'finalized')

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

Expand Down Expand Up @@ -276,7 +279,7 @@ def __init__(self):
def _create_table_sql(self) -> str:
assert False, 'No script for the table'

def _fetchone(self, values, keys) -> str:
def _fetchone(self, values, keys, order_list=None) -> str:
cursor = self._conn.cursor()

where_cond = '1=1'
Expand All @@ -285,8 +288,11 @@ def _fetchone(self, values, keys) -> str:
where_cond += f' AND {name} = %s'
where_keys.append(value)

logger.debug(f'SELECT {",".join(values)} FROM {self._table_name} WHERE {where_cond}')
cursor.execute(f'SELECT {",".join(values)} FROM {self._table_name} WHERE {where_cond}', where_keys)
order_cond = ''
if order_list:
order_cond = 'ORDER BY ' + ', '.join(order_list)

cursor.execute(f'SELECT {",".join(values)} FROM {self._table_name} WHERE {where_cond} {order_cond}', where_keys)
return cursor.fetchone()

def __del__(self):
Expand Down Expand Up @@ -319,7 +325,7 @@ def _create_table_sql(self) -> str:

json TEXT,

UNIQUE(transactionLogIndex, transactionHash, topic)
UNIQUE(transactionLogIndex, transactionHash, topic, slot, finalized)
);
CREATE INDEX IF NOT EXISTS {self._table_name}_finalized ON {self._table_name}(slot, finalized);
"""
Expand Down
4 changes: 4 additions & 0 deletions proxy/run-proxy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ if [ "$CONFIG" == "ci" ]; then
[[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=10
[[ -z "$RETRY_ON_BLOCKED" ]] && export RETRY_ON_BLOCKED=32
[[ -z "$RETRY_ON_FAIL" ]] && export RETRY_ON_FAIL=10
[[ -z "$FINALIZED" ]] && export FINALIZED="confirmed"
elif [ "$CONFIG" == "local" ]; then
[[ -z "$SOLANA_URL" ]] && export SOLANA_URL="http://localhost:8899"
[[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=0
Expand All @@ -23,6 +24,7 @@ elif [ "$CONFIG" == "local" ]; then
[[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=10
[[ -z "$RETRY_ON_BLOCKED" ]] && export RETRY_ON_BLOCKED=32
[[ -z "$RETRY_ON_FAIL" ]] && export RETRY_ON_FAIL=10
[[ -z "$FINALIZED" ]] && export FINALIZED="confirmed"
elif [ "$CONFIG" == "devnet" ]; then
[[ -z "$SOLANA_URL" ]] && export SOLANA_URL="https://api.devnet.solana.com"
[[ -z "$EVM_LOADER" ]] && export EVM_LOADER=eeLSJgWzzxrqKv1UxtRVVH8FX3qCQWUs9QuAjJpETGU
Expand All @@ -33,6 +35,7 @@ elif [ "$CONFIG" == "devnet" ]; then
[[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=60
[[ -z "$RETRY_ON_BLOCKED" ]] && export RETRY_ON_BLOCKED=32
[[ -z "$RETRY_ON_FAIL" ]] && export RETRY_ON_FAIL=10
[[ -z "$FINALIZED" ]] && export FINALIZED="finalized"
elif [ "$CONFIG" == "testnet" ]; then
[[ -z "$SOLANA_URL" ]] && export SOLANA_URL="https://api.testnet.solana.com"
[[ -z "$EVM_LOADER" ]] && export EVM_LOADER=eeLSJgWzzxrqKv1UxtRVVH8FX3qCQWUs9QuAjJpETGU
Expand All @@ -43,6 +46,7 @@ elif [ "$CONFIG" == "testnet" ]; then
[[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=60
[[ -z "$RETRY_ON_BLOCKED" ]] && export RETRY_ON_BLOCKED=32
[[ -z "$RETRY_ON_FAIL" ]] && export RETRY_ON_FAIL=10
[[ -z "$FINALIZED" ]] && export FINALIZED="finalized"
elif [ "$CONFIG" != "custom" ]; then
exit 1
fi
Expand Down
1 change: 1 addition & 0 deletions run-airdropper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ if [ -z "$EVM_LOADER" ]; then
echo "EVM_LOADER=$EVM_LOADER"
fi
export AIRDROPPER_MODE='true'
[[ -z "$FINALIZED" ]] && export FINALIZED="confirmed"

python3 -m proxy
1 change: 1 addition & 0 deletions run-test-airdropper.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/bin/bash

[[ -z "$FINALIZED" ]] && export FINALIZED="confirmed"
export EVM_LOADER=$(solana address -k /spl/bin/evm_loader-keypair.json)
python3 -m proxy