Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add finalized stage for neon transactions and blocks
  • Loading branch information
afalaleev committed Jan 13, 2022
commit 1e7c600a47b89a74106776e34287ad47270f46c8
File renamed without changes.
3 changes: 1 addition & 2 deletions proxy/common_neon/transaction_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@
from .solana_interactor import SolanaInteractor, check_for_errors,\
check_if_program_exceeded_instructions, check_if_accounts_blocked, get_logs_from_reciept
from ..environment import EVM_LOADER_ID, RETRY_ON_BLOCKED
from ..plugin.eth_proto import Trx as EthTrx
from ..indexer.utils import NeonTxResultInfo

from ..common_neon.eth_proto import Trx as EthTrx

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
Expand Down
112 changes: 112 additions & 0 deletions proxy/indexer/blocks_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from proxy.indexer.pg_common import encode, decode
from .utils import BaseDB, str_fmt_object


class SolanaBlockDBInfo:
def __init__(self, slot=None, finalized=False, height=None, hash=None, parent_hash=None, time=None, signs=None):
self.slot = slot
self.finalized = finalized
self.height = height
self.hash = hash
self.parent_hash = parent_hash
self.time = time
self.signs = signs

def __str__(self):
return str_fmt_object(self)


class SolanaBlocksDB(BaseDB):
def __init__(self):
BaseDB.__init__(self)
self._column_lst = ('slot', 'finalized', 'height', 'hash')
self._full_column_lst = ('slot', 'finalized', 'height', 'hash', 'parent_hash', 'blocktime', 'signatures')

def _create_table_sql(self) -> str:
self._table_name = 'solana_blocks'
return f"""
CREATE TABLE IF NOT EXISTS {self._table_name} (
slot BIGINT,
finalized BOOLEAN,
height BIGINT,
hash CHAR(66),

parent_hash CHAR(66),
blocktime BIGINT,
signatures BYTEA,

UNIQUE(slot)
);
CREATE INDEX IF NOT EXISTS {self._table_name}_hash ON {self._table_name}(hash);
CREATE INDEX IF NOT EXISTS {self._table_name}_height ON {self._table_name}(height);
"""

def _block_from_value(self, value, slot=None) -> SolanaBlockDBInfo:
if not value:
return SolanaBlockDBInfo(slot=slot)

return SolanaBlockDBInfo(
slot=value[0],
finalized=value[1],
height=value[2],
hash=value[3],
)

def _full_block_from_value(self, value, slot=None) -> SolanaBlockDBInfo:
if not value:
return SolanaBlockDBInfo(slot=slot)

return SolanaBlockDBInfo(
slot=value[0],
finalized=value[1],
height=value[2],
hash=value[3],
parent_hash=value[4],
time=value[5],
signs=decode(value[6]),
)

def get_block_by_slot(self, block_slot) -> SolanaBlockDBInfo:
return self._block_from_value(self._fetchone(self._column_lst, [('slot', block_slot)]), block_slot)

def get_full_block_by_slot(self, block_slot) -> SolanaBlockDBInfo:
return self._full_block_from_value(self._fetchone(self._full_column_lst, [('slot', block_slot)]), block_slot)

def get_block_by_hash(self, block_hash) -> SolanaBlockDBInfo:
return self._block_from_value(self._fetchone(self._column_lst, [('hash', block_hash)]))

def get_block_by_height(self, block_num) -> SolanaBlockDBInfo:
return self._block_from_value(self._fetchone(self._column_lst, [('height', block_num)]))

def set_block(self, block: SolanaBlockDBInfo):
cursor = self._conn.cursor()
cursor.execute(f'''
INSERT INTO {self._table_name}
({', '.join(self._full_column_lst)})
VALUES
({', '.join(['%s' for _ in range(len(self._full_column_lst))])})
ON CONFLICT (slot) DO UPDATE SET
hash=EXCLUDED.hash,
height=EXCLUDED.height,
parent_hash=EXCLUDED.parent_hash,
blocktime=EXCLUDED.blocktime,
signatures=EXCLUDED.signatures
''',
(block.slot, block.finalized, block.height, block.hash,
block.parent_hash, block.time, encode(block.signs)))

def fill_block_height(self, height, slots):
rows = []
for slot in slots:
rows.append((slot, height))
height += 1

cursor = self._conn.cursor()
cursor.executemany(
f'INSERT INTO {self._table_name}(slot, finalized, height) VALUES(%s, True, %s) ON CONFLICT DO NOTHING',
rows)

def del_not_finalized(self, from_slot: int, to_slot: int):
cursor = self._conn.cursor()
cursor.execute(f'DELETE FROM {self._table_name} WHERE slot >= %s AND slot <= %s AND finalized = false',
(from_slot, to_slot))
65 changes: 34 additions & 31 deletions proxy/indexer/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@
try:
from indexer_base import logger, IndexerBase, PARALLEL_REQUESTS
from indexer_db import IndexerDB
from utils import check_error, NeonIxSignInfo, NeonTxResultInfo, NeonTxSignInfo, Canceller, str_fmt_object
from utils import SolanaIxSignInfo, NeonTxResultInfo, NeonTxSignInfo, Canceller, str_fmt_object
except ImportError:
from .indexer_base import logger, IndexerBase, PARALLEL_REQUESTS
from .indexer_db import IndexerDB
from .utils import check_error, NeonIxSignInfo, NeonTxResultInfo, NeonTxAddrInfo, Canceller, str_fmt_object
from .utils import SolanaIxSignInfo, NeonTxResultInfo, NeonTxInfo, Canceller, str_fmt_object

from ..environment import EVM_LOADER_ID

CANCEL_TIMEOUT = int(os.environ.get("CANCEL_TIMEOUT", "60"))
UPDATE_BLOCK_COUNT = PARALLEL_REQUESTS * 16


class NeonIxInfo:
def __init__(self, sign: bytes, slot: int, tx: {}):
self.sign = NeonIxSignInfo(sign=sign, slot=slot, idx=-1)
class SolanaIxInfo:
def __init__(self, sign: str, slot: int, tx: {}):
self.sign = SolanaIxSignInfo(sign=sign, slot=slot, idx=-1)
self.tx = tx
self._is_valid = isinstance(tx, dict)
self._msg = self.tx['transaction']['message'] if self._is_valid else None
Expand Down Expand Up @@ -61,7 +61,7 @@ def iter_ixs(self):
evm_ix_idx = -1
for ix_idx, self.ix in tx_ixs:
# Make a new object to keep values in existing
self.sign = NeonIxSignInfo(sign=self.sign.sign, slot=self.sign.slot, idx=ix_idx)
self.sign = SolanaIxSignInfo(sign=self.sign.sign, slot=self.sign.slot, idx=ix_idx)
if 'programIdIndex' not in self.ix:
logger.debug(f'{self} error: fail to get program id')
continue
Expand Down Expand Up @@ -102,7 +102,7 @@ def __init__(self):
self.used_ixs = []
self.slot = 0

def mark_ix_used(self, ix_info: NeonIxInfo):
def mark_ix_used(self, ix_info: SolanaIxInfo):
self.used_ixs.append(ix_info.sign)
self.slot = max(self.slot, ix_info.sign.slot)

Expand All @@ -125,10 +125,10 @@ def __str__(self):


class NeonTxObject(BaseEvmObject):
def __init__(self, storage_account: str, neon_tx: NeonTxAddrInfo, neon_res: NeonTxResultInfo):
def __init__(self, storage_account: str, neon_tx: NeonTxInfo, neon_res: NeonTxResultInfo):
BaseEvmObject.__init__(self)
self.storage_account = storage_account
self.neon_tx = (neon_tx or NeonTxAddrInfo())
self.neon_tx = (neon_tx or NeonTxInfo())
self.neon_res = (neon_res or NeonTxResultInfo())
self.step_count = []
self.holder_account = ''
Expand Down Expand Up @@ -175,9 +175,9 @@ def __init__(self, db: IndexerDB, client):
self._tx_table = {}
self._done_tx_list = []
self._used_ixs = {}
self.ix = NeonIxInfo(sign=bytes(), slot=-1, tx=None)
self.ix = SolanaIxInfo(sign='', slot=-1, tx=None)

def set_ix(self, ix_info: NeonIxInfo):
def set_ix(self, ix_info: SolanaIxInfo):
self.ix = ix_info

def mark_ix_used(self, obj: BaseEvmObject):
Expand Down Expand Up @@ -239,7 +239,7 @@ def complete_done_txs(self):
for tx in self._done_tx_list:
self.unmark_ix_used(tx)
if tx.neon_tx.is_valid() and tx.neon_res.is_valid():
self._db.submit_transaction(self._client, tx.neon_tx, tx.neon_res, tx.used_ixs)
self._db.submit_transaction(tx.neon_tx, tx.neon_res, tx.used_ixs)
self.del_tx(tx)
self._done_tx_list.clear()

Expand All @@ -257,7 +257,7 @@ def __str__(self):
return f'{self.name} {self.state.ix}'

@staticmethod
def neon_addr_fmt(neon_tx: NeonTxAddrInfo):
def neon_addr_fmt(neon_tx: NeonTxInfo):
return f'Neon tx {neon_tx.sign}, Neon addr {neon_tx.addr}'

def _getadd_tx(self, storage_account, neon_tx=None, blocked_accounts=[str]) -> NeonTxObject:
Expand Down Expand Up @@ -344,9 +344,7 @@ def _decode_tx(self, tx: NeonTxObject):
the parsing order can be other than the execution order
"""
if not tx.neon_res.is_valid():
res_error = tx.neon_res.decode(self.ix.tx, self.ix.sign.idx)
if res_error:
return self._decoding_fail(tx, 'Neon results error')
tx.neon_res.decode(self.ix.tx, self.ix.sign.idx)
if tx.neon_res.is_valid():
return self._decoding_done(tx, 'found Neon results')
return self._decoding_success(tx, 'mark ix used')
Expand Down Expand Up @@ -397,7 +395,7 @@ def __str__(self):
def is_valid(self) -> bool:
return (self.length > 0) and (len(self.data) == self.length)

def _decode_datachunck(self, ix_info: NeonIxInfo) -> _DataChunk:
def _decode_datachunck(self, ix_info: SolanaIxInfo) -> _DataChunk:
# No enough bytes to get length of chunk
if len(ix_info.ix_data) < 17:
return self._DataChunk()
Expand Down Expand Up @@ -436,7 +434,7 @@ class WriteWithHolderIxDecoder(WriteIxDecoder):
def __init__(self, state: ReceiptsParserState):
DummyIxDecoder.__init__(self, 'WriteWithHolder', state)

def _decode_datachunck(self, ix_info: NeonIxInfo) -> WriteIxDecoder._DataChunk:
def _decode_datachunck(self, ix_info: SolanaIxInfo) -> WriteIxDecoder._DataChunk:
# No enough bytes to get length of chunk
if len(ix_info.ix_data) < 22:
return self._DataChunk()
Expand All @@ -461,15 +459,11 @@ def execute(self) -> bool:
rlp_sign = self.ix.ix_data[25:90]
rlp_data = self.ix.ix_data[90:]

neon_tx = NeonTxAddrInfo(rlp_sign=rlp_sign, rlp_data=rlp_data)
neon_tx = NeonTxInfo(rlp_sign=rlp_sign, rlp_data=rlp_data)
if neon_tx.error:
return self._decoding_skip(f'Neon tx rlp error "{neon_tx.error}"')

neon_res = NeonTxResultInfo(self.ix.tx, self.ix.sign.idx)
if neon_res.error:
return self._decoding_skip(f'Neon results error "{neon_res.error}"')

tx = NeonTxObject('', neon_tx=neon_tx, neon_res=neon_res)
tx = NeonTxObject('', neon_tx=neon_tx, neon_res=NeonTxResultInfo(self.ix.tx, self.ix.sign.idx))
return self._decoding_done(tx, 'call success')


Expand All @@ -493,7 +487,7 @@ def execute(self) -> bool:
rlp_sign = self.ix.ix_data[33:98]
rlp_data = self.ix.ix_data[98:]

neon_tx = NeonTxAddrInfo(rlp_sign=rlp_sign, rlp_data=rlp_data)
neon_tx = NeonTxInfo(rlp_sign=rlp_sign, rlp_data=rlp_data)
if neon_tx.error:
return self._decoding_skip(f'Neon tx rlp error "{neon_tx.error}"')

Expand Down Expand Up @@ -626,7 +620,7 @@ def __init__(self,
evm_loader_id,
log_level = 'INFO'):
IndexerBase.__init__(self, solana_url, evm_loader_id, log_level, 0)
self.db = IndexerDB()
self.db = IndexerDB(self.client)
self.canceller = Canceller()
self.blocked_storages = {}
self.processed_slot = self.db.get_min_receipt_slot()
Expand Down Expand Up @@ -658,24 +652,29 @@ def __init__(self,

def process_functions(self):
IndexerBase.process_functions(self)
logger.debug("Process receipts")
self.process_receipts()
logger.debug("Start getting blocks")
self.gather_blocks()
logger.debug("Process receipts")
self.process_receipts()
logger.debug("Unlock accounts")
self.canceller.unlock_accounts(self.blocked_storages)
self.blocked_storages = {}

def process_receipts(self):
start_time = time.time()

max_slot = 0
max_slot = self.processed_slot - 1
last_block_slot = self.db.get_last_block_slot()

for slot, sign, tx in self.transaction_receipts.get_trxs(self.processed_slot, reverse=False):
if slot > last_block_slot:
break

if max_slot != slot:
self.state.complete_done_txs()
max_slot = max(max_slot, slot)

ix_info = NeonIxInfo(slot=slot, sign=sign, tx=tx)
ix_info = SolanaIxInfo(sign=sign, slot=slot, tx=tx)

for _ in ix_info.iter_ixs():
self.state.set_ix(ix_info)
Expand All @@ -687,7 +686,7 @@ def process_receipts(self):
for tx in self.state.iter_txs():
if tx.storage_account and abs(tx.slot - self.current_slot) > CANCEL_TIMEOUT:
logger.debug(f'Neon tx is blocked: storage {tx.storage_account}, {tx.neon_tx}')
self.blocked_storages[tx.storage_account] = (tx.neon_tx.rlp_tx, tx.blocked_accounts)
self.blocked_storages[tx.storage_account] = (tx.neon_tx, tx.blocked_accounts)

self.processed_slot = max(self.processed_slot, max_slot + 1)
self.db.set_min_receipt_slot(self.state.find_min_used_slot(self.processed_slot))
Expand All @@ -699,6 +698,7 @@ def gather_blocks(self):
start_time = time.time()
last_block_slot = self.db.get_last_block_slot()
max_height = self.db.get_last_block_height()
start_block_slot = last_block_slot
height = -1
confirmed_blocks_len = 10000
client = self.client._provider
Expand All @@ -725,6 +725,9 @@ def gather_blocks(self):
break

# Everything is good
logger.debug(f'remove not finalized data in range[{start_block_slot}..{last_block_slot}]')
self.db.del_not_finalized(from_slot=start_block_slot, to_slot=last_block_slot)
start_block_slot = last_block_slot
logger.debug(f"gather_blocks from {height} to {max_height}")
self.db.fill_block_height(height, confirmed_blocks)
self.db.set_last_slot_height(last_block_slot, max_height)
Expand Down
2 changes: 1 addition & 1 deletion proxy/indexer/indexer_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2"))

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG)

DEVNET_HISTORY_START = "7BdwyUQ61RUZP63HABJkbW66beLk22tdXnP69KsvQBJekCPVaHoJY47Rw68b3VV1UbQNHxX3uxUSLfiJrfy2bTn"
HISTORY_START = [DEVNET_HISTORY_START]
Expand Down
Loading