Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use SolTxSender for transaction sending.
  • Loading branch information
afalaleev committed Feb 20, 2022
commit 34d667559a65d40ba080676a4ffcc1c29c269572
9 changes: 5 additions & 4 deletions proxy/common_neon/permission_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Union
from solana.transaction import Transaction
import spl.token.instructions as spl_token
from proxy.common_neon.solana_interactor import SolanaInteractor
from proxy.common_neon.solana_interactor import SolanaInteractor, SolTxListSender
from decimal import Decimal
import os

Expand All @@ -15,8 +15,9 @@ def __init__(self,
token_mint: PublicKey,
payer: SolanaAccount):
self.solana = solana
self.signer = payer
self.waiter = None
self.token_mint = token_mint
self.payer = payer

def get_token_account_address(self, ether_addr: Union[str, EthereumAddress]):
sol_addr = PublicKey(ether2program(ether_addr)[0])
Expand All @@ -35,12 +36,12 @@ def create_account_if_needed(self,

txn = Transaction()
create_txn = spl_token.create_associated_token_account(
payer=self.payer.public_key(),
payer=self.signer.public_key(),
owner=PublicKey(ether2program(ether_addr)[0]),
mint=self.token_mint
)
txn.add(create_txn)
self.solana.send_multiple_transactions(self.payer, [txn], skip_preflight=True)
SolTxListSender(self, [txn], 'CreateAssociatedTokenAccount(1)', skip_preflight=True).send()
return token_account

def mint_to(self,
Expand Down
221 changes: 183 additions & 38 deletions proxy/common_neon/solana_interactor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import base58
import base64
import json
Expand All @@ -14,19 +16,30 @@
from solana.transaction import Transaction
from itertools import zip_longest
from logged_groups import logged_group
from typing import Dict, Union
from typing import Dict, Union, Any, List, NamedTuple, cast
from base58 import b58decode, b58encode

from .costs import update_transaction_cost
from .utils import get_from_dict, SolanaBlockInfo
from ..environment import EVM_LOADER_ID, CONFIRMATION_CHECK_DELAY, WRITE_TRANSACTION_COST_IN_DB, SKIP_PREFLIGHT
from ..environment import LOG_SENDING_SOLANA_TRANSACTION, FUZZING_BLOCKHASH, CONFIRM_TIMEOUT, FINALIZED
from ..environment import RETRY_ON_FAIL

from ..common_neon.layouts import ACCOUNT_INFO_LAYOUT
from ..common_neon.address import EthereumAddress, ether2program
from ..common_neon.address import AccountInfo as NeonAccountInfo

from typing import Any, List, NamedTuple, cast

class SolTxError(Exception):
def __init__(self, receipt):
self.result = receipt
error = get_error_definition_from_receipt(receipt)
if isinstance(error, list) and isinstance(error[1], str):
super().__init__(str(error[1]))
self.error = str(error[1])
else:
super().__init__('Unknown error')
self.error = json.dumps(receipt)


class AccountInfo(NamedTuple):
Expand Down Expand Up @@ -146,7 +159,7 @@ def get_account_info(self, pubkey: PublicKey, length=256, commitment='confirmed'

return AccountInfo(account_tag, lamports, owner, data)

def get_multiple_accounts_info(self, accounts: [PublicKey], length=256, commitment='confirmed') -> [AccountInfo]:
def get_account_info_list(self, accounts: [PublicKey], length=256, commitment='confirmed') -> [AccountInfo]:
opts = {
"encoding": "base64",
"commitment": commitment,
Expand Down Expand Up @@ -368,9 +381,8 @@ def _send_multiple_transactions(self, signer: SolanaAccount, tx_list: [Transacti
response_list = self._send_rpc_batch_request('sendTransaction', request_list)
return [SendResult(result=r.get('result'), error=r.get('error')) for r in response_list]

def send_multiple_transactions(self, signer, tx_list,
eth_tx=None, reason=None, waiter=None,
skip_preflight=SKIP_PREFLIGHT, preflight_commitment='confirmed') -> [{}]:
def send_multiple_transactions(self, signer: SolanaAccount, tx_list: [], waiter,
skip_preflight: bool, preflight_commitment: str) -> [{}]:
send_result_list = self._send_multiple_transactions(signer, tx_list, skip_preflight, preflight_commitment)
# Filter good transactions and wait the confirmations for them
sign_list = [s.result for s in send_result_list if s.result]
Expand All @@ -386,28 +398,8 @@ def send_multiple_transactions(self, signer, tx_list,
else:
receipt_list.append(confirmed_list.pop(0))

if eth_tx and reason and WRITE_TRANSACTION_COST_IN_DB:
for receipt in receipt_list:
update_transaction_cost(receipt, eth_tx, reason)

return receipt_list

# Do not rename this function! This name used in CI measurements (see function `cleanup_docker` in
# .buildkite/steps/deploy-test.sh)
def get_measurements(self, reason, eth_tx, receipt):
if not LOG_SENDING_SOLANA_TRANSACTION:
return

try:
self.debug(f"send multiple transactions for reason {reason}")

measurements = self._extract_measurements_from_receipt(receipt)
for m in measurements:
self.info(f'get_measurements: {json.dumps(m)}')
except Exception as err:
self.error(f"get_measurements: can't get measurements {err}")
self.info(f"get measurements: failed result {json.dumps(receipt, indent=3)}")

def _confirm_multiple_transactions(self, sign_list: [str], waiter=None):
"""Confirm a transaction."""
if not len(sign_list):
Expand Down Expand Up @@ -459,6 +451,157 @@ def _get_multiple_receipts(self, sign_list: [str]) -> [Any]:
response_list = self._send_rpc_batch_request("getTransaction", request_list)
return [r['result'] for r in response_list]


@logged_group("neon.Proxy")
class SolTxListSender:
def __init__(self, sender, tx_list: [Transaction], name: str,
skip_preflight=SKIP_PREFLIGHT, preflight_commitment='confirmed'):
self._s = sender
self._name = name
self._skip_preflight = skip_preflight
self._preflight_commitment = preflight_commitment

self._blockhash = None
self._retry_idx = 0
self._tx_list = tx_list
self._bad_block_list = []
self._blocked_account_list = []
self._pending_list = []
self._budget_exceeded_list = []
self._storage_bad_status_list = []

self._all_tx_list = [self._bad_block_list,
self._blocked_account_list,
self._budget_exceeded_list,
self._pending_list]

def clear(self):
self._tx_list.clear()
for lst in self._all_tx_list:
lst.clear()

def _get_full_list(self):
return [tx for lst in self._all_tx_list for tx in lst]

def send(self) -> SolTxListSender:
solana = self._s.solana
signer = self._s.signer
waiter = self._s.waiter
skip = self._skip_preflight
commitment = self._preflight_commitment

self.debug(f'start transactions sending: {self._name}')

while (self._retry_idx < RETRY_ON_FAIL) and (len(self._tx_list)):
self._retry_idx += 1
receipt_list = solana.send_multiple_transactions(signer, self._tx_list, waiter, skip, commitment)
self.update_transaction_cost(receipt_list)

success_cnt = 0
for receipt, tx in zip(receipt_list, self._tx_list):
if check_if_blockhash_notfound(receipt):
self._bad_block_list.append(tx)
elif check_if_accounts_blocked(receipt):
self._blocked_account_list.append(tx)
elif check_for_errors(receipt):
if check_if_program_exceeded_instructions(receipt):
self._budget_exceeded_list.append(tx)
else:
custom = check_if_storage_is_empty_error(receipt)
if custom in (1, 4):
self._storage_bad_status_list.append(receipt)
else:
raise SolTxError(receipt)
else:
success_cnt += 1
self._on_success_send(tx, receipt)

self.debug(f'retry {self._retry_idx}, ' +
f'total receipts {len(receipt_list)}, ' +
f'success receipts {success_cnt}, ' +
f'bad blocks {len(self._bad_block_list)}, ' +
f'blocked accounts {len(self._blocked_account_list)}, ' +
f'budget exceeded {len(self._budget_exceeded_list)}, ' +
f'bad storage: {len(self._storage_bad_status_list)}')

self._on_post_send()

if len(self._tx_list):
raise RuntimeError('Run out of attempts to execute transaction')
return self

def update_transaction_cost(self, receipt_list):
if not WRITE_TRANSACTION_COST_IN_DB:
return False
if not hasattr(self._s, 'eth_tx'):
return False

for receipt in receipt_list:
update_transaction_cost(receipt, self._s.eth_tx, reason=self._name)

def _on_success_send(self, tx: Transaction, receipt: {}) -> bool:
"""Store the last successfully blockhash and set it in _set_tx_blockhash"""
self._blockhash = tx.recent_blockhash
return False

def _on_post_send(self):
if len(self._storage_bad_status_list):
raise SolTxError(self._storage_bad_status_list[0])
elif len(self._budget_exceeded_list):
raise RuntimeError(COMPUTATION_BUDGET_EXCEEDED)

# There is no more retries to send transactions
if self._retry_idx >= RETRY_ON_FAIL:
if not self._is_canceled:
self._cancel()
return

if len(self._blocked_account_list):
time.sleep(0.4) # one block time

# force changing of recent_blockhash if Solana doesn't accept the current one
if len(self._bad_block_list):
self._blockhash = None

# resend not-accepted transactions
self._move_txlist()

def _set_tx_blockhash(self, tx):
"""Try to keep the branch of block history"""
tx.recent_blockhash = self._blockhash
tx.signatures.clear()

def _move_txlist(self):
full_list = self._get_full_list()
self.clear()
for tx in full_list:
self._set_tx_blockhash(tx)
self._tx_list.append(tx)
if len(self._tx_list):
self.debug(f' Resend Solana transactions: {len(self._tx_list)}')


@logged_group("neon.Proxy")
class Measurements:
def __init__(self):
pass

# Do not change headers in info logs! This name used in CI measurements (see function `cleanup_docker` in
# .buildkite/steps/deploy-test.sh)
def extract(self, reason: str, receipt: {}):
if not LOG_SENDING_SOLANA_TRANSACTION:
return

try:
self.debug(f"send multiple transactions for reason {reason}")

measurements = self._extract_measurements_from_receipt(receipt)
for m in measurements:
self.info(f'get_measurements: {json.dumps(m)}')
except Exception as err:
self.error(f"get_measurements: can't get measurements {err}")
self.info(f"get measurements: failed result {json.dumps(receipt, indent=3)}")

def _extract_measurements_from_receipt(self, receipt):
if check_for_errors(receipt):
self.warning("Can't get measurements from receipt with error")
Expand All @@ -483,37 +626,39 @@ def _extract_measurements_from_receipt(self, receipt):
res = pattern.match(log)
if res:
(program, reason) = res.groups()
if reason == 'invoke [1]': messages.append({'program':program,'logs':[]})
if reason == 'invoke [1]': messages.append({'program': program, 'logs': []})
messages[-1]['logs'].append(log)

for instr in instructions:
if instr['program'] in ('KeccakSecp256k11111111111111111111111111111',): continue
if messages[0]['program'] != instr['program']:
raise ValueError('Invalid program in log messages: expect %s, actual %s' % (messages[0]['program'], instr['program']))
raise ValueError('Invalid program in log messages: expect %s, actual %s' % (
messages[0]['program'], instr['program']))
instr['logs'] = messages.pop(0)['logs']
exit_result = re.match(r'Program %s (success)'%instr['program'], instr['logs'][-1])
exit_result = re.match(r'Program %s (success)' % instr['program'], instr['logs'][-1])
if not exit_result: raise ValueError("Can't get exit result")
instr['result'] = exit_result.group(1)

if instr['program'] == EVM_LOADER_ID:
memory_result = re.match(r'Program log: Total memory occupied: ([0-9]+)', instr['logs'][-3])
instruction_result = re.match(r'Program %s consumed ([0-9]+) of ([0-9]+) compute units'%instr['program'], instr['logs'][-2])
instruction_result = re.match(
r'Program %s consumed ([0-9]+) of ([0-9]+) compute units' % instr['program'], instr['logs'][-2])
if not (memory_result and instruction_result):
raise ValueError("Can't parse measurements for evm_loader")
instr['measurements'] = {
'instructions': instruction_result.group(1),
'memory': memory_result.group(1)
}
'instructions': instruction_result.group(1),
'memory': memory_result.group(1)
}

result = []
for instr in instructions:
if instr['program'] == EVM_LOADER_ID:
result.append({
'program':instr['program'],
'measurements':instr['measurements'],
'result':instr['result'],
'data':instr['data']
})
'program': instr['program'],
'measurements': instr['measurements'],
'result': instr['result'],
'data': instr['data']
})
return result


Expand Down
Loading