Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Use slot for block number. #580
  • Loading branch information
afalaleev committed Feb 17, 2022
commit aa2d446a1f5a9f766b68f6ab1a75b081e0f6e0d7
1 change: 0 additions & 1 deletion proxy/common_neon/solana_interactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ def get_block_info_list(self, block_slot_list: [int], commitment='confirmed') ->
slot=slot,
finalized=(commitment == FINALIZED),
hash='0x' + base58.b58decode(net_block['blockhash']).hex(),
height=net_block['blockHeight'],
parent_hash='0x' + base58.b58decode(net_block['previousBlockhash']).hex(),
time=net_block['blockTime'],
signs=net_block['signatures']
Expand Down
7 changes: 2 additions & 5 deletions proxy/common_neon/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ def str_fmt_object(obj):


class SolanaBlockInfo:
def __init__(self, slot=None, finalized=False, height=None, hash=None, parent_hash=None, time=None, signs=None):
def __init__(self, slot=None, finalized=False, hash=None, parent_hash=None, time=None, signs=None):
self.slot = slot
self.finalized = finalized
self.height = height
self.hash = hash
self.parent_hash = parent_hash
self.time = time
Expand Down Expand Up @@ -62,7 +61,6 @@ def _set_defaults(self):
self.return_value = bytes()
self.sol_sign = None
self.slot = -1
self.block_height = -1
self.block_hash = ''
self.idx = -1

Expand Down Expand Up @@ -100,11 +98,10 @@ def _decode_return(self, log: bytes, ix_idx: int, tx: {}):

def fill_block_info(self, block: SolanaBlockInfo):
self.slot = block.slot
self.block_height = block.height
self.block_hash = block.hash
for rec in self.logs:
rec['blockHash'] = block.hash
rec['blockNumber'] = hex(block.height)
rec['blockNumber'] = hex(block.slot)

def decode(self, neon_sign: str, tx: {}, ix_idx=-1) -> NeonTxResultInfo:
self._set_defaults()
Expand Down
135 changes: 25 additions & 110 deletions proxy/indexer/blocks_db.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
import psycopg2
import psycopg2.extras
from typing import Optional

from ..indexer.utils import BaseDB, DBQuery
from ..common_neon.utils import SolanaBlockInfo


class SolanaBlocksDB(BaseDB):
def __init__(self):
BaseDB.__init__(self)
self._column_lst = ('slot', 'hash')
self._full_column_lst = ('slot', 'hash', 'parent_hash', 'blocktime', 'signatures')

def _create_table_sql(self) -> str:
self._table_name = 'solana_block'
return f"""
CREATE TABLE IF NOT EXISTS {self._table_name}_heights (
slot BIGINT,
height BIGINT,

UNIQUE(slot),
UNIQUE(height)
);
CREATE TABLE IF NOT EXISTS {self._table_name}_hashes (
CREATE TABLE IF NOT EXISTS {self._table_name} (
slot BIGINT,
hash CHAR(66),

Expand All @@ -32,127 +26,48 @@ def _create_table_sql(self) -> str:
);
"""

def _fetch_block(self, slot, q: DBQuery) -> SolanaBlockInfo:
e = self._build_expression(q)

request = f'''
SELECT a.slot, a.height, b.hash
FROM {self._table_name}_heights AS a
LEFT JOIN {self._table_name}_hashes AS b
ON a.slot = b.slot
WHERE {e.where_expr}
{e.order_expr}
LIMIT 1
'''

with self._conn.cursor() as cursor:
cursor.execute(request, e.where_keys)
values = cursor.fetchone()

def _block_from_value(self, slot: Optional[int], values: []) -> SolanaBlockInfo:
if not values:
return SolanaBlockInfo(slot=slot)

return SolanaBlockInfo(
finalized=True,
slot=values[0],
height=values[1],
hash=values[2],
hash=values[1],
)

def _fetch_full_block(self, slot, q: DBQuery) -> SolanaBlockInfo:
e = self._build_expression(q)

request = f'''
SELECT a.slot, a.height, b.hash, b.parent_hash, b.blocktime, b.signatures
FROM {self._table_name}_heights AS a
LEFT JOIN {self._table_name}_hashes AS b
ON a.slot = b.slot
WHERE {e.where_expr}
{e.order_expr}
LIMIT 1
'''

with self._conn.cursor() as cursor:
cursor.execute(request, e.where_keys)
values = cursor.fetchone()

def _full_block_from_value(self, slot: Optional[int], values: []) -> SolanaBlockInfo:
if not values:
return SolanaBlockInfo(slot=slot)

return SolanaBlockInfo(
finalized=True,
slot=values[0],
height=values[1],
hash=values[2],
parent_hash=values[3],
time=values[4],
signs=self.decode_list(values[5])
hash=values[1],
parent_hash=values[2],
time=values[3],
signs=self.decode_list(values[4])
)

def get_latest_block(self) -> SolanaBlockInfo:
q = DBQuery(column_list=[], key_list=[], order_list=['a.slot DESC'])
return self._fetch_block(None, q)

def get_latest_block_list(self, limit: int) -> [SolanaBlockInfo]:
request = f'''
SELECT a.slot, a.height, b.hash, b.parent_hash, b.blocktime, b.signatures
FROM {self._table_name}_heights AS a
LEFT JOIN {self._table_name}_hashes AS b
ON a.slot = b.slot
ORDER BY a.slot DESC
LIMIT {limit}
'''

with self._conn.cursor() as cursor:
cursor.execute(request, [])
values = cursor.fetchall()

if not values:
return []
return [
SolanaBlockInfo(
finalized=True,
slot=value[0],
height=value[1],
hash=value[2],
parent_hash=value[3],
time=value[4],
signs=self.decode_list(value[5])
) for value in values
]

def get_block_by_slot(self, block_slot: int) -> SolanaBlockInfo:
q = DBQuery(column_list=[], key_list=[('a.slot', block_slot)], order_list=[])
return self._fetch_block(block_slot, q)
q = DBQuery(column_list=self._column_lst, key_list=[('slot', block_slot)], order_list=[])
return self._block_from_value(block_slot, self._fetchone(q))

def get_full_block_by_slot(self, block_slot) -> SolanaBlockInfo:
q = DBQuery(column_list=[], key_list=[('a.slot', block_slot)], order_list=[])
return self._fetch_full_block(block_slot, q)
q = DBQuery(column_list=self._full_column_lst, key_list=[('slot', block_slot)], order_list=[])
return self._block_from_value(block_slot, self._fetchone(q))

def get_block_by_hash(self, block_hash) -> SolanaBlockInfo:
q = DBQuery(column_list=[], key_list=[('b.hash', block_hash)], order_list=[])
return self._fetch_block(None, q)

def get_block_by_height(self, block_num) -> SolanaBlockInfo:
q = DBQuery(column_list=[], key_list=[('a.height', block_num)], order_list=[])
return self._fetch_block(None, q)
q = DBQuery(column_list=self._column_lst, key_list=[('block_hash', block_hash)], order_list=[])
return self._block_from_value(None, self._fetchone(q))

def set_block(self, block: SolanaBlockInfo):
cursor = self._conn.cursor()
cursor.execute(f'''
INSERT INTO {self._table_name}_hashes
({', '.join(self._full_column_lst)})
VALUES
({', '.join(['%s' for _ in range(len(self._full_column_lst))])})
ON CONFLICT DO NOTHING;
''',
(block.slot, block.hash, block.parent_hash, block.time, self.encode_list(block.signs)))

def fill_block_height(self, height, slots):
with self._conn.cursor() as cursor:
psycopg2.extras.execute_values(cursor, f"""
INSERT INTO {self._table_name}_heights
(slot, height)
VALUES %s
ON CONFLICT DO NOTHING
""", ((slot, height+idx) for idx, slot in enumerate(slots)), template="(%s, %s)", page_size=1000)
cursor.execute(f'''
INSERT INTO {self._table_name}
({', '.join(self._full_column_lst)})
VALUES
({', '.join(['%s' for _ in range(len(self._full_column_lst))])})
ON CONFLICT DO NOTHING;
''',
(block.slot, block.hash, block.parent_hash, block.time, self.encode_list(block.signs)))
57 changes: 5 additions & 52 deletions proxy/indexer/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,45 +681,15 @@ def __init__(self, db: IndexerDB, solana_client: Client):

def gather_blocks(self):
start_time = time.time()
latest_block = self.db.get_latest_block()
height = -1
min_height = height
confirmed_blocks_len = 10000
client = self.solana_client._provider
list_opts = {"commitment": FINALIZED}
block_opts = {"commitment": FINALIZED, "transactionDetails": "none", "rewards": False}
while confirmed_blocks_len == 10000:
confirmed_blocks = client.make_request("getBlocksWithLimit", latest_block.slot, confirmed_blocks_len, list_opts)['result']
confirmed_blocks_len = len(confirmed_blocks)
# No more blocks
if confirmed_blocks_len == 0:
break

# Intitialize start height
if height == -1:
first_block = client.make_request("getBlock", confirmed_blocks[0], block_opts)
height = first_block['result']['blockHeight']

# Validate last block height
latest_block.height = height + confirmed_blocks_len - 1
latest_block.slot = confirmed_blocks[confirmed_blocks_len - 1]
last_block = client.make_request("getBlock", latest_block.slot, block_opts)
if not last_block['result'] or last_block['result']['blockHeight'] != latest_block.height:
self.warning(f"FAILED last_block_height {latest_block.height} " +
f"last_block_slot {latest_block.slot} " +
f"last_block {last_block}")
break

# Everything is good
min_height = min(min_height, height) if min_height > 0 else height
self.db.fill_block_height(height, confirmed_blocks)
height = latest_block.height

opts = {"commitment": FINALIZED}
slot = client.make_request('getSlot', opts)['result']
self.db.set_latest_block(slot)
gather_blocks_ms = (time.time() - start_time) * 1000 # convert this into milliseconds
self.counted_logger.print(
self.debug,
list_params={"gather_blocks_ms": gather_blocks_ms, "processed_height": latest_block.height - min_height},
latest_params={"last_block_slot": latest_block.slot}
list_params={"gather_blocks_ms": gather_blocks_ms},
latest_params={"last_block_slot": slot}
)


Expand All @@ -734,7 +704,6 @@ def __init__(self, solana_url, evm_loader_id):
self.db.set_client(self.solana_client)
self.canceller = Canceller()
self.blocked_storages = {}
self._init_last_height_slot()
self.block_indexer = BlocksIndexer(db=self.db, solana_client=self.solana_client)
self.counted_logger = MetricsToLogBuff()

Expand Down Expand Up @@ -764,22 +733,6 @@ def __init__(self, solana_url, evm_loader_id):
}
self.def_decoder = DummyIxDecoder('Unknown', self.state)

def _init_last_height_slot(self):
last_known_slot = self.db.get_latest_block().slot
slot = self._init_last_slot('height', last_known_slot)
if last_known_slot == slot:
return

block_opts = {"commitment": FINALIZED, "transactionDetails": "none", "rewards": False}
client = self.solana_client._provider
block = client.make_request("getBlock", slot, block_opts)
if not block['result']:
self.warning(f"Solana haven't return block information for the slot {slot}")
return

height = block['result']['blockHeight']
self.db.fill_block_height(height, [slot])

def process_functions(self):
self.block_indexer.gather_blocks()
IndexerBase.process_functions(self)
Expand Down
17 changes: 5 additions & 12 deletions proxy/indexer/indexer_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self):
self._client = None

self._constants = SQLDict(tablename="constants")
for k in ['min_receipt_slot']:
for k in ['min_receipt_slot', 'latest_slot']:
if k not in self._constants:
self._constants[k] = 0

Expand Down Expand Up @@ -61,7 +61,6 @@ def _fill_block_from_net(self, block: SolanaBlockInfo):
return block

block.hash = '0x' + base58.b58decode(net_block['blockhash']).hex()
block.height = net_block['blockHeight']
block.signs = net_block['signatures']
block.parent_hash = '0x' + base58.b58decode(net_block['previousBlockhash']).hex()
block.time = net_block['blockTime']
Expand Down Expand Up @@ -104,18 +103,15 @@ def get_full_block_by_slot(self, slot) -> SolanaBlockInfo:
return block

def get_latest_block(self) -> SolanaBlockInfo:
return self._blocks_db.get_latest_block()
return SolanaBlockInfo(slot=self._constants['latest_slot'])

def get_latest_block_list(self, limit: int) -> [SolanaBlockInfo]:
return self._blocks_db.get_latest_block_list(limit)

def fill_block_height(self, number, slots):
self._blocks_db.fill_block_height(number, slots)
def set_latest_block(self, slot: int):
self._constants['latest_slot'] = slot

def get_min_receipt_slot(self) -> int:
return self._constants['min_receipt_slot']

def set_min_receipt_slot(self, slot):
def set_min_receipt_slot(self, slot: int):
self._constants['min_receipt_slot'] = slot

def get_logs(self, from_block, to_block, addresses, topics, block_hash):
Expand All @@ -124,9 +120,6 @@ def get_logs(self, from_block, to_block, addresses, topics, block_hash):
def get_block_by_hash(self, block_hash: str) -> SolanaBlockInfo:
return self._blocks_db.get_block_by_hash(block_hash)

def get_block_by_height(self, block_height: int) -> SolanaBlockInfo:
return self._blocks_db.get_block_by_height(block_height)

def get_tx_list_by_sol_sign(self, sol_sign_list: [str]) -> [NeonTxFullInfo]:
tx_list = self._txs_db.get_tx_list_by_sol_sign(sol_sign_list)
block = None
Expand Down
2 changes: 1 addition & 1 deletion proxy/indexer/logs_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def push_logs(self, logs, block):
(
log['address'],
block.hash,
block.height,
block.slot,
log['transactionHash'],
int(log['transactionLogIndex'], 16),
topic,
Expand Down
3 changes: 1 addition & 2 deletions proxy/indexer/transactions_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def set_txs(self, neon_sign: str, used_ixs: [SolanaIxSignInfo]):
class NeonTxsDB(BaseDB):
def __init__(self):
BaseDB.__init__(self)
self._column_lst = ('neon_sign', 'from_addr', 'sol_sign', 'slot', 'block_height', 'block_hash', 'idx',
self._column_lst = ('neon_sign', 'from_addr', 'sol_sign', 'slot', 'block_hash', 'idx',
'nonce', 'gas_price', 'gas_limit', 'to_addr', 'contract', 'value', 'calldata',
'v', 'r', 's', 'status', 'gas_used', 'return_value', 'logs')
self._sol_neon_txs_db = SolanaNeonTxsDB()
Expand All @@ -51,7 +51,6 @@ def _create_table_sql(self) -> str:
from_addr CHAR(42),
sol_sign CHAR(88),
slot BIGINT,
block_height BIGINT,
block_hash CHAR(66),
idx INT,

Expand Down
2 changes: 1 addition & 1 deletion proxy/indexer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def _build_expression(self, q: DBQuery) -> DBQueryExpression:
order_expr='ORDER BY ' + ', '.join(q.order_list) if len(q.order_list) else '',
)

def _fetchone(self, query: DBQuery) -> str:
def _fetchone(self, query: DBQuery) -> []:
e = self._build_expression(query)

request = f'''
Expand Down
Loading