Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
R#406 Reduce the number of requests to solana rpc (#405)
* Reduce the number of requests to solana rpc

* Use JSON RPC batch processing

* Do not send create accounts trx if it was already sent

* Use batch processing for rent exempt balance

* Match between the set of Request objects and the resulting set of Response
  • Loading branch information
anton-lisanin authored Dec 27, 2021
commit 67e5656022ac49182946ecccb5b4e4e1347783c4
134 changes: 107 additions & 27 deletions proxy/common_neon/solana_interactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,61 @@
import logging
import re
import time
import requests

from solana.blockhash import Blockhash
from solana.publickey import PublicKey
from solana.rpc.api import Client as SolanaClient
from solana.rpc.api import SendTransactionError
from solana.rpc.commitment import Confirmed
from solana.rpc.types import TxOpts
from solana.rpc.types import RPCResponse, TxOpts
from solana.transaction import Transaction
from urllib.parse import urlparse
from itertools import zip_longest

from .costs import update_transaction_cost
from .utils import get_from_dict
from ..environment import EVM_LOADER_ID, CONFIRMATION_CHECK_DELAY, LOG_SENDING_SOLANA_TRANSACTION, RETRY_ON_FAIL

from typing import Any, List, NamedTuple, Union, cast

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

class AccountInfo(NamedTuple):
tag: int
lamports: int
owner: PublicKey

class SolanaInteractor:
def __init__(self, signer, client: SolanaClient) -> None:
self.signer = signer
self.client = client

def _send_rpc_batch_request(self, method: str, params_list: List[Any]) -> List[RPCResponse]:
request_data = []
for params in params_list:
request_id = next(self.client._provider._request_counter) + 1
request = {"jsonrpc": "2.0", "id": request_id, "method": method, "params": params}
request_data.append(request)

response = requests.post(self.client._provider.endpoint_uri, headers={"Content-Type": "application/json"}, json=request_data)
response.raise_for_status()

response_data = cast(List[RPCResponse], response.json())
response_data.sort(key=lambda r: r["id"])

for request, response in zip_longest(request_data, response_data):
if request["id"] != response["id"]:
raise Exception("Invalid RPC response: request {} response {}", request, response)

return response_data


def get_operator_key(self):
return self.signer.public_key()


def get_account_info(self, storage_account):
def get_account_info(self, storage_account) -> AccountInfo:
opts = {
"encoding": "base64",
"commitment": "confirmed",
Expand All @@ -54,15 +82,39 @@ def get_account_info(self, storage_account):
lamports = info['lamports']
owner = info['owner']

return (account_tag, lamports, owner)
return AccountInfo(account_tag, lamports, owner)

def get_multiple_accounts_info(self, accounts: List[PublicKey]) -> List[AccountInfo]:
options = {
"encoding": "base64",
"commitment": "confirmed",
"dataSlice": { "offset": 0, "length": 16 }
}
result = self.client._provider.make_request("getMultipleAccounts", list(map(str, accounts)), options)
logger.debug("\n{}".format(json.dumps(result, indent=4, sort_keys=True)))

if result['result']['value'] is None:
logger.debug("Can't get information about {}".format(accounts))
return None

accounts_info = []
for info in result['result']['value']:
if info is None:
accounts_info.append(None)
else:
data = base64.b64decode(info['data'][0])
accounts_info.append(AccountInfo(tag=data[0], lamports=info['lamports'], owner=info['owner']))

return accounts_info

def get_sol_balance(self, account):
return self.client.get_balance(account, commitment=Confirmed)['result']['value']


def get_rent_exempt_balance_for_size(self, size):
return self.client.get_minimum_balance_for_rent_exemption(size, commitment=Confirmed)["result"]
def get_multiple_rent_exempt_balances_for_size(self, size_list: List[int]) -> List[int]:
request = map(lambda size: (size, {"commitment": "confirmed"}), size_list)
response = self._send_rpc_batch_request("getMinimumBalanceForRentExemption", request)
return list(map(lambda r: r["result"], response))


def _getAccountData(self, account, expected_length, owner=None):
Expand Down Expand Up @@ -109,12 +161,23 @@ def send_transaction_unconfirmed(self, txn: Transaction):
raise
raise RuntimeError("Failed trying {} times to get Blockhash for transaction {}".format(RETRY_ON_FAIL, txn.__dict__))

def send_multiple_transactions_unconfirmed(self, transactions: List[Transaction], skip_preflight: bool = True) -> List[str]:
blockhash_resp = self.client.get_recent_blockhash() # commitment=Confirmed
if not blockhash_resp["result"]:
raise RuntimeError("failed to get recent blockhash")

def collect_result(self, reciept, eth_trx, reason=None):
self.confirm_transaction(reciept)
result = self.client.get_confirmed_transaction(reciept)
update_transaction_cost(result, eth_trx, reason)
return result
blockhash = blockhash_resp["result"]["value"]["blockhash"]

request = []
for transaction in transactions:
transaction.recent_blockhash = blockhash
transaction.sign(self.signer)

base64_transaction = base64.b64encode(transaction.serialize()).decode("utf-8")
request.append((base64_transaction, {"skipPreflight": skip_preflight, "encoding": "base64", "preflightCommitment": "confirmed"}))

response = self._send_rpc_batch_request("sendTransaction", request)
return list(map(lambda r: r["result"], response))


def send_measured_transaction(self, trx, eth_trx, reason):
Expand All @@ -134,30 +197,47 @@ def get_measurements(self, result):
logger.error("Can't get measurements %s"%err)
logger.info("Failed result: %s"%json.dumps(result, indent=3))


def confirm_transaction(self, tx_sig, confirmations=0):
def confirm_multiple_transactions(self, signatures: List[Union[str, bytes]]):
"""Confirm a transaction."""
TIMEOUT = 30 # 30 seconds pylint: disable=invalid-name
elapsed_time = 0
while elapsed_time < TIMEOUT:
logger.debug('confirm_transaction for %s', tx_sig)
resp = self.client.get_signature_statuses([tx_sig])
logger.debug('confirm_transaction: %s', resp)
if resp["result"]:
status = resp['result']['value'][0]
if status and (status['confirmationStatus'] == 'finalized' or \
status['confirmationStatus'] == 'confirmed' and status['confirmations'] >= confirmations):
return
response = self.client.get_signature_statuses(signatures)
logger.debug('confirm_transactions: %s', response)
if response['result'] is None:
continue

for status in response['result']['value']:
if status is None:
break
if status['confirmationStatus'] == 'processed':
break
else:
return

time.sleep(CONFIRMATION_CHECK_DELAY)
elapsed_time += CONFIRMATION_CHECK_DELAY
raise RuntimeError("could not confirm transaction: ", tx_sig)

raise RuntimeError("could not confirm transactions: ", signatures)

def get_multiple_confirmed_transactions(self, signatures: List[str]) -> List[RPCResponse]:
request = map(lambda signature: (signature, {"encoding": "json", "commitment": "confirmed"}), signatures)
return self._send_rpc_batch_request("getTransaction", request)

def collect_results(self, receipts, eth_trx=None, reason=None):
results = []
for rcpt in receipts:
results.append(self.collect_result(rcpt, eth_trx, reason))
return results
def collect_results(self, receipts: List[str], eth_trx: Any = None, reason: str = None) -> List[RPCResponse]:
self.confirm_multiple_transactions(receipts)
transactions = self.get_multiple_confirmed_transactions(receipts)

for transaction in transactions:
update_transaction_cost(transaction, eth_trx, reason)

return transactions

def collect_result(self, reciept, eth_trx, reason=None):
self.confirm_multiple_transactions([reciept])
result = self.client.get_confirmed_transaction(reciept)
update_transaction_cost(result, eth_trx, reason)
return result

@staticmethod
def extract_measurements_from_receipt(receipt):
Expand Down
77 changes: 29 additions & 48 deletions proxy/common_neon/transaction_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import math
import os
from typing import List
import rlp
import time

Expand Down Expand Up @@ -138,7 +139,7 @@ def create_account_with_seed(self, seed, storage_size):
account = accountWithSeed(self.sender.get_operator_key(), seed)

if self.sender.get_sol_balance(account) == 0:
minimum_balance = self.sender.get_rent_exempt_balance_for_size(storage_size)
minimum_balance = self.sender.get_multiple_rent_exempt_balances_for_size([storage_size])[0]
logger.debug("Minimum balance required for account {}".format(minimum_balance))

trx = Transaction()
Expand All @@ -148,29 +149,24 @@ def create_account_with_seed(self, seed, storage_size):
return account


def create_multiple_accounts_with_seed(self, seeds, sizes):
accounts = []
trx = Transaction()

for seed, storage_size in zip(seeds, sizes):
account = accountWithSeed(self.sender.get_operator_key(), seed)
accounts.append(account)
def create_multiple_accounts_with_seed(self, seeds: List[bytes], sizes: List[int]) -> List[PublicKey]:
accounts = list(map(lambda seed: accountWithSeed(self.sender.get_operator_key(), seed), seeds))
accounts_info = self.sender.get_multiple_accounts_info(accounts)
minimum_balances = self.sender.get_multiple_rent_exempt_balances_for_size(sizes)

minimum_balance = self.sender.get_rent_exempt_balance_for_size(storage_size)
trx = Transaction()

account_info = self.sender.get_account_info(account)
for account_key, account_info, seed, minimum_balance, storage_size in zip(accounts, accounts_info, seeds, minimum_balances, sizes):
if account_info is None:
logger.debug("Minimum balance required for account {}".format(minimum_balance))

trx.add(self.instruction.create_account_with_seed_trx(account, seed, minimum_balance, storage_size))
trx.add(self.instruction.create_account_with_seed_trx(account_key, seed, minimum_balance, storage_size))
else:
(tag, lamports, owner) = account_info
if lamports < minimum_balance:
if account_info.lamports < minimum_balance:
raise Exception("insufficient balance")
if PublicKey(owner) != PublicKey(EVM_LOADER_ID):
if PublicKey(account_info.owner) != PublicKey(EVM_LOADER_ID):
raise Exception("wrong owner")
if tag not in {EMPTY_STORAGE_TAG, FINALIZED_STORAGE_TAG}:
raise Exception("not empty, not finalized")
if account_info.tag not in {EMPTY_STORAGE_TAG, FINALIZED_STORAGE_TAG}:
raise Exception("not empty, not finalized")

if len(trx.instructions) > 0:
self.sender.send_transaction(trx, eth_trx=self.eth_trx, reason='createAccountWithSeed')
Expand Down Expand Up @@ -250,7 +246,7 @@ def create_account_list_by_emulate(self):
code_account = accountWithSeed(self.sender.get_operator_key(), seed)
logger.debug(" with code account %s", code_account)
code_size = acc_desc["code_size"] + 2048
code_account_balance = self.sender.get_rent_exempt_balance_for_size(code_size)
code_account_balance = self.sender.get_multiple_rent_exempt_balances_for_size([code_size])[0]
self.create_acc_trx.add(self.instruction.create_account_with_seed_trx(code_account, seed, code_account_balance, code_size))
# add_keys_05.append(AccountMeta(pubkey=code_account, is_signer=False, is_writable=acc_desc["writable"]))
code_account_writable = acc_desc["writable"]
Expand Down Expand Up @@ -343,32 +339,29 @@ def __init__(self, solana_interactor: SolanaInteractor, neon_instruction: NeonIn


def call_signed_iterative_combined(self):
self.create_accounts_for_trx()
if len(self.create_acc_trx.instructions) > 0:
create_accounts_siganture = self.sender.send_transaction_unconfirmed(self.create_acc_trx)
self.sender.confirm_multiple_transactions([create_accounts_siganture])
self.create_acc_trx = Transaction()

self.instruction_type = self.CONTINUE_COMBINED
return self.call_continue()


def call_signed_with_holder_combined(self):
self.write_trx_to_holder_account()
self.create_accounts_for_trx()
self.instruction_type = self.CONTINUE_HOLDER_COMB
return self.call_continue()

precall_transactions = self.make_write_to_holder_account_trx()
if len(self.create_acc_trx.instructions) > 0:
precall_transactions.append(self.create_acc_trx)
signatures = self.sender.send_multiple_transactions_unconfirmed(precall_transactions)
self.sender.confirm_multiple_transactions(signatures)

def create_accounts_for_trx(self):
length = len(self.create_acc_trx.instructions)
if length == 0:
return
logger.debug(f"Create account for trx: {length}")
precall_txs = Transaction()
precall_txs.add(self.create_acc_trx)
result = self.sender.send_measured_transaction(precall_txs, self.eth_trx, 'CreateAccountsForTrx')
if check_for_errors(result):
raise Exception("Failed to create account for trx")
self.create_acc_trx = Transaction()

self.instruction_type = self.CONTINUE_HOLDER_COMB
return self.call_continue()


def write_trx_to_holder_account(self):
def make_write_to_holder_account_trx(self) -> List[Transaction]:
logger.debug('write_trx_to_holder_account')
msg = self.eth_trx.signature() + len(self.eth_trx.unsigned_msg()).to_bytes(8, byteorder="little") + self.eth_trx.unsigned_msg()

Expand All @@ -381,19 +374,7 @@ def write_trx_to_holder_account(self):
trxs.append(trx)
offset += len(part)

while len(trxs) > 0:
receipts = {}
for trx in trxs:
receipts[self.sender.send_transaction_unconfirmed(trx)] = trx

logger.debug("receipts %s", receipts)
for rcpt, trx in receipts.items():
try:
self.sender.collect_result(rcpt, eth_trx=self.eth_trx, reason='WriteHolder')
except Exception as err:
logger.debug("collect_result exception: {}".format(str(err)))
else:
trxs.remove(trx)
return trxs


def call_continue(self):
Expand Down