diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 77e01b455..7274fd096 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -9,7 +9,7 @@ steps: - label: ":terraform: build infrastructure" key: "create_infrastructure" if: &is_fts_enabled | - (build.pull_request.base_branch == "712-mempool" && !build.pull_request.draft) || + (build.pull_request.base_branch == "develop" && !build.pull_request.draft) || (build.source == "trigger_job" && build.env("NEON_EVM_FULL_TEST_SUITE") == "true") agents: queue: "testing" diff --git a/proxy/common_neon/data.py b/proxy/common_neon/data.py index 80e660945..6bcc8c8c8 100644 --- a/proxy/common_neon/data.py +++ b/proxy/common_neon/data.py @@ -22,9 +22,3 @@ def __init__(self, neon_tx_hash: str, neon_income: int, tx_type: str, is_cancele def add_instruction(self, sol_tx_hash: str, sol_spent: int, steps: int, bpf: int) -> None: self.instructions.append((sol_tx_hash, sol_spent, steps, bpf)) - - -@dataclass -class NeonTxData: - tx_signed: str - diff --git a/proxy/common_neon/utils/pickable_data_server.py b/proxy/common_neon/utils/pickable_data_server.py index e335f4a22..5a54df3eb 100644 --- a/proxy/common_neon/utils/pickable_data_server.py +++ b/proxy/common_neon/utils/pickable_data_server.py @@ -12,7 +12,7 @@ class PickableDataServerUser(ABC): @abstractmethod - def on_data_received(self, data: Any) -> Any: + async def on_data_received(self, data: Any) -> Any: """Gets neon_tx_data from the neon rpc api service worker""" @@ -37,23 +37,25 @@ async def handle_client(self, reader: StreamReader, writer: StreamWriter): while True: try: data = await self._recv_pickable_data(reader) - result = self._user.on_data_received(data) + result = await self._user.on_data_received(data) result_data = encode_pickable(result) writer.write(result_data) await writer.drain() except ConnectionResetError: - self.error(f"Client connection has been closed") + break + except asyncio.exceptions.IncompleteReadError as err: + self.error(f"Incomplete read error: {err}") break except Exception as err: self.error(f"Failed to receive data err: {err}, type: {type(err)}") break async def _recv_pickable_data(self, reader: StreamReader): - len_packed: bytes = await reader.readexactly(4) + len_packed: bytes = await reader.read(4) if len(len_packed) == 0: raise ConnectionResetError() payload_len_data = struct.unpack("!I", len_packed)[0] - payload = await reader.readexactly(payload_len_data) + payload = await reader.read(payload_len_data) data = pickle.loads(payload) return data @@ -78,29 +80,23 @@ def __init__(self, *, user: PickableDataServerUser, srv_sock: socket.socket): PickableDataServer.__init__(self, user=user) async def run_server(self): - print("run_server_by_conn") reader, writer = await asyncio.streams.open_connection(sock=self._srv_sock) - print("Got reader, writer") await self.handle_client(reader, writer) @logged_group("neon.Proxy") class PickableDataClient: - CONNECTION_TIMEOUT_SEC = 5 - def __init__(self): self._client_sock = None def _set_client_sock(self, client_sock: socket.socket): self._client_sock = client_sock - self._client_sock.setblocking(False) - self._client_sock.settimeout(self.CONNECTION_TIMEOUT_SEC) def send_data(self, pickable_object: Any): try: payload = encode_pickable(pickable_object) - self._client_sock.send(payload) + sent = self._client_sock.send(payload) len_packed: bytes = self._client_sock.recv(4) data_len = struct.unpack("!I", len_packed)[0] data = self._client_sock.recv(data_len) @@ -110,29 +106,26 @@ def send_data(self, pickable_object: Any): return result except BaseException as err: self.error(f"Failed to send data: {err}") - raise Exception("Failed to send pickable data") + raise async def send_data_async(self, pickable_object): - reader, writer = await asyncio.streams.open_connection(sock=self._client_sock) + loop = asyncio.get_event_loop() try: payload = encode_pickable(pickable_object) - writer.write(payload) - await writer.drain() - len_packed: bytes = await reader.readexactly(4) + await loop.sock_sendall(self._client_sock, payload) + + len_packed: bytes = await loop.sock_recv(self._client_sock, 4) if not len_packed: return None data_len = struct.unpack("!I", len_packed)[0] - data = await reader.readexactly(data_len) + data = await loop.sock_recv(self._client_sock, data_len) if not data: return None result = pickle.loads(data) return result except BaseException as err: self.error(f"Failed to send data: {err}") - raise Exception("Failed to send pickable data") - - def __del__(self): - self._client_sock.close() + raise class PipePickableDataClient(PickableDataClient): diff --git a/proxy/memdb/transactions_db.py b/proxy/memdb/transactions_db.py index cb220314f..e43bc54f5 100644 --- a/proxy/memdb/transactions_db.py +++ b/proxy/memdb/transactions_db.py @@ -83,6 +83,8 @@ def _has_topics(src_topics, dst_topics): return False result_list = [] + indexed_logs = self._db.get_logs(from_block, to_block, addresses, topics, block_hash) + with self._tx_slot.get_lock(): for data in self._tx_by_neon_sign.values(): tx = pickle.loads(data) @@ -97,9 +99,10 @@ def _has_topics(src_topics, dst_topics): continue if len(topics) and (not _has_topics(topics, log['topics'])): continue + if log in indexed_logs: + continue result_list.append(log) - - return result_list + self._db.get_logs(from_block, to_block, addresses, topics, block_hash) + return indexed_logs + result_list def get_sol_sign_list_by_neon_sign(self, neon_sign: str, is_pended_tx: bool, before_slot: int) -> [str]: if not is_pended_tx: diff --git a/proxy/mempool/__init__.py b/proxy/mempool/__init__.py index ec2e99105..84d5d1624 100644 --- a/proxy/mempool/__init__.py +++ b/proxy/mempool/__init__.py @@ -1,7 +1,7 @@ from .mempool_client import MemPoolClient -from .mempool_service import MemPoolService -from .mem_pool import MemPool +from .mempool_service import MPService +from .mempool import MemPool from .mempool_api import * -MEMPOOL_SERVICE_PORT = MemPoolService.MEMPOOL_SERVICE_PORT -MEMPOOL_SERVICE_HOST = MemPoolService.MEMPOOL_SERVICE_HOST +MP_SERVICE_PORT = MPService.MP_SERVICE_PORT +MP_SERVICE_HOST = MPService.MP_SERVICE_HOST diff --git a/proxy/mempool/executor_mng.py b/proxy/mempool/executor_mng.py new file mode 100644 index 000000000..91bac4898 --- /dev/null +++ b/proxy/mempool/executor_mng.py @@ -0,0 +1,91 @@ +import asyncio +import dataclasses +import socket + +from collections import deque +from typing import List, Tuple, Deque, Set +from logged_groups import logged_group + +from ..common_neon.config import IConfig +from ..common_neon.utils import PipePickableDataClient + +from .mempool_api import MPRequest, IMPExecutor +from .mempool_executor import MPExecutor + + +class MpExecutorClient(PipePickableDataClient): + + def __init__(self, client_sock: socket.socket): + PipePickableDataClient.__init__(self, client_sock=client_sock) + + async def send_tx_request(self, mempool_tx_request: MPRequest): + return await self.send_data_async(mempool_tx_request) + + +@logged_group("neon.MemPool") +class MPExecutorMng(IMPExecutor): + + BRING_BACK_EXECUTOR_TIMEOUT_SEC = 1800 + + @dataclasses.dataclass + class ExecutorInfo: + executor: MPExecutor + client: MpExecutorClient + id: int + + def __init__(self, executor_count: int, config: IConfig): + self.info(f"Initialize executor mng with executor_count: {executor_count}") + self._available_executor_pool: Deque[int] = deque() + self._busy_executor_pool: Set[int] = set() + self._executors: List[MPExecutorMng.ExecutorInfo] = list() + for i in range(executor_count): + executor_info = MPExecutorMng._create_executor(i, config) + self._executors.append(executor_info) + self._available_executor_pool.appendleft(i) + executor_info.executor.start() + + def submit_mp_request(self, mp_reqeust: MPRequest) -> Tuple[int, asyncio.Task]: + executor_id, executor = self._get_executor() + tx_hash = "0x" + mp_reqeust.neon_tx.hash_signed().hex() + self.debug(f"Tx: {tx_hash} - scheduled on executor: {executor_id}") + task = asyncio.get_event_loop().create_task(executor.send_data_async(mp_reqeust)) + return executor_id, task + + def is_available(self) -> bool: + return self._has_available() + + def _has_available(self) -> bool: + return len(self._available_executor_pool) > 0 + + def _get_executor(self) -> Tuple[int, MpExecutorClient]: + executor_id = self._available_executor_pool.pop() + self.debug(f"Acquire executor: {executor_id}") + self._busy_executor_pool.add(executor_id) + executor_info = self._executors[executor_id] + return executor_id, executor_info.client + + def on_no_liquidity(self, resource_id: int): + self.debug(f"No liquidity, executor: {resource_id} - will be unblocked in: {MPExecutorMng.BRING_BACK_EXECUTOR_TIMEOUT_SEC} sec") + asyncio.get_event_loop().create_task(self._release_executor_later(resource_id)) + + async def _release_executor_later(self, executor_id: int): + await asyncio.sleep(MPExecutorMng.BRING_BACK_EXECUTOR_TIMEOUT_SEC) + self.release_resource(executor_id) + + def release_resource(self, resource_id: int): + self.debug(f"Release executor: {resource_id}") + self._busy_executor_pool.remove(resource_id) + self._available_executor_pool.appendleft(resource_id) + + @staticmethod + def _create_executor(executor_id: int, config: IConfig) -> ExecutorInfo: + client_sock, srv_sock = socket.socketpair() + executor = MPExecutor(executor_id, srv_sock, config) + client = MpExecutorClient(client_sock) + return MPExecutorMng.ExecutorInfo(executor=executor, client=client, id=executor_id) + + def __del__(self): + for executor_info in self._executors: + executor_info.executor.kill() + self._busy_executor_pool.clear() + self._available_executor_pool.clear() diff --git a/proxy/mempool/mem_pool.py b/proxy/mempool/mem_pool.py deleted file mode 100644 index 072393c67..000000000 --- a/proxy/mempool/mem_pool.py +++ /dev/null @@ -1,30 +0,0 @@ -from logged_groups import logged_group -from concurrent.futures import ProcessPoolExecutor - -from ..common_neon.config import IConfig - -from .mempool_api import MemPoolTxRequest -from .mempool_tx_executor import MemPoolTxExecutor - - -@logged_group("neon.MemPool") -class MemPool: - - POOL_PROC_COUNT = 8 - - def __init__(self, config: IConfig): - self._pool = ProcessPoolExecutor(self.POOL_PROC_COUNT) - self._tx_executor = MemPoolTxExecutor(config) - - def send_raw_transaction(self, mempool_tx_request: MemPoolTxRequest) -> bool: - try: - self._pool.submit(MemPool._send_raw_transaction_impl, mempool_tx_request) - except Exception as err: - print(f"Failed enqueue mempool_tx_request into the worker pool: {err}") - return False - return True - - @staticmethod - def _send_raw_transaction_impl(mempool_tx_request: MemPoolTxRequest) -> bool: - print(f"mempool_tx_request: {mempool_tx_request}") - return True diff --git a/proxy/mempool/mempool.py b/proxy/mempool/mempool.py new file mode 100644 index 000000000..9986d262a --- /dev/null +++ b/proxy/mempool/mempool.py @@ -0,0 +1,151 @@ +import asyncio +from typing import List, Tuple, Dict +from logged_groups import logged_group +import bisect + +from .mempool_api import MPRequest, MPResultCode, MPResult, IMPExecutor, MPRequestType, \ + MPTxRequest, MPPendingTxCountReq + + +@logged_group("neon.MemPool") +class MemPool: + + TX_QUEUE_MAX_SIZE = 4096 + TX_QUEUE_SIZE = 4095 + CHECK_TASK_TIMEOUT_SEC = 0.05 + + def __init__(self, executor: IMPExecutor): + self._req_queue = [] + self._lock = asyncio.Lock() + self._req_queue_cond = asyncio.Condition() + self._processing_tasks: List[Tuple[int, asyncio.Task, MPRequest]] = [] + # signer -> pending_tx_counter + self._pending_trx_counters: Dict[str, int] = {} + self._process_tx_results_task = asyncio.get_event_loop().create_task(self.check_processing_tasks()) + self._process_tx_queue_task = asyncio.get_event_loop().create_task(self.process_tx_queue()) + + self._executor = executor + + async def enqueue_mp_request(self, mp_request: MPRequest): + if mp_request.type == MPRequestType.SendTransaction: + tx_request: MPTxRequest = mp_request + return await self.on_send_tx_request(tx_request) + elif mp_request.type == MPRequestType.GetTrxCount: + pending_count_req: MPPendingTxCountReq = mp_request + return self.get_pending_trx_count(pending_count_req.sender) + + async def on_send_tx_request(self, mp_request: MPTxRequest): + await self.enqueue_mp_transaction(mp_request) + sender = "0x" + mp_request.neon_tx.sender() + self._inc_pending_tx_counter(sender) + count = self.get_pending_trx_count(sender) + self.debug(f"On send tx request. Sender: {sender}, pending tx count: {count}") + + async def enqueue_mp_transaction(self, mp_request: MPTxRequest): + tx_hash = mp_request.neon_tx.hash_signed().hex() + log_ctx = {"context": {"req_id": mp_request.req_id}} + try: + self.debug(f"Got mp_tx_request: 0x{tx_hash} to be scheduled on the mempool", extra=log_ctx) + if len(self._req_queue) > MemPool.TX_QUEUE_MAX_SIZE: + self._req_queue = self._req_queue[-MemPool.TX_QUEUE_SIZE:] + bisect.insort_left(self._req_queue, mp_request) + await self._kick_tx_queue() + except Exception as err: + self.error(f"Failed enqueue tx: {tx_hash} into queue: {err}", extra=log_ctx) + + def get_pending_trx_count(self, sender: str): + return self._pending_trx_counters.get(sender, 0) + + async def process_tx_queue(self): + while True: + async with self._req_queue_cond: + await self._req_queue_cond.wait() + if len(self._req_queue) == 0: + self.debug("Tx queue empty - continue waiting for new") + continue + if not self._executor.is_available(): + self.debug("No way to process tx - no available executor") + continue + mp_request: MPRequest = self._req_queue.pop() + self.submit_request_to_executor(mp_request) + + def submit_request_to_executor(self, mp_tx_request: MPRequest): + resource_id, task = self._executor.submit_mp_request(mp_tx_request) + self._processing_tasks.append((resource_id, task, mp_tx_request)) + + async def check_processing_tasks(self): + while True: + not_finished_tasks = [] + for resource_id, task, mp_request in self._processing_tasks: + if not task.done(): + not_finished_tasks.append((resource_id, task, mp_request)) + self._executor.release_resource(resource_id) + continue + exception = task.exception() + if exception is not None: + log_ctx = {"context": {"req_id": mp_request.req_id}} + self.error(f"Exception during processing request: {exception} - tx will be dropped away", extra=log_ctx) + self._on_request_dropped_away(mp_request) + self._executor.release_resource(resource_id) + continue + + mp_result: MPResult = task.result() + assert isinstance(mp_result, MPResult) + assert mp_result.code != MPResultCode.Dummy + await self._process_mp_result(resource_id, mp_result, mp_request) + + self._processing_tasks = not_finished_tasks + await asyncio.sleep(MemPool.CHECK_TASK_TIMEOUT_SEC) + + async def _process_mp_result(self, resource_id: int, mp_result: MPResult, mp_request: MPTxRequest): + tx_hash = "0x" + mp_request.neon_tx.hash_signed().hex() + log_ctx = {"context": {"req_id": mp_request.req_id}} + if mp_result.code == MPResultCode.Done: + self.debug(f"Neon tx: {tx_hash} - processed on executor: {resource_id} - done", extra=log_ctx) + self._on_request_done(mp_request) + self._executor.release_resource(resource_id) + await self._kick_tx_queue() + return + self.warning(f"Failed to process tx: {tx_hash} - on executor: {resource_id}, status: {mp_result} - reschedule", extra=log_ctx) + if mp_result.code == MPResultCode.BlockedAccount: + self._executor.release_resource(resource_id) + await self.enqueue_mp_request(mp_request) + await self._kick_tx_queue() + elif mp_result.code == MPResultCode.NoLiquidity: + self._executor.on_no_liquidity(resource_id) + await self.enqueue_mp_request(mp_request) + await self._kick_tx_queue() + elif mp_result.code == MPResultCode.Unspecified: + self._executor.release_resource(resource_id) + self._on_request_dropped_away(mp_request) + await self._kick_tx_queue() + + def _on_request_done(self, tx_request: MPTxRequest): + sender = "0x" + tx_request.neon_tx.sender() + self._dec_pending_tx_counter(sender) + count = self.get_pending_trx_count(sender) + log_ctx = {"context": {"req_id": tx_request.req_id}} + self.debug(f"Reqeust done. Sender: {sender}, pending tx count: {count}", extra=log_ctx) + + def _on_request_dropped_away(self, tx_request: MPTxRequest): + sender = "0x" + tx_request.neon_tx.sender() + self._dec_pending_tx_counter(sender) + count = self.get_pending_trx_count(sender) + log_ctx = {"context": {"req_id": tx_request.req_id}} + self.debug(f"Reqeust dropped away. Sender: {sender}, pending tx count: {count}", extra=log_ctx) + + def _inc_pending_tx_counter(self, sender: str): + counts = self._pending_trx_counters.get(sender, 0) + self._pending_trx_counters.update({sender: counts + 1}) + + def _dec_pending_tx_counter(self, sender: str): + count = self._pending_trx_counters.get(sender, 0) + assert count > 0 + count = count - 1 + if count == 0: + del self._pending_trx_counters[sender] + self._pending_trx_counters.update({sender: count}) + + async def _kick_tx_queue(self): + async with self._req_queue_cond: + self._req_queue_cond.notify() diff --git a/proxy/mempool/mempool_api.py b/proxy/mempool/mempool_api.py index 4f515a224..bc5dc3226 100644 --- a/proxy/mempool/mempool_api.py +++ b/proxy/mempool/mempool_api.py @@ -1,11 +1,78 @@ -from dataclasses import dataclass +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import IntEnum +from typing import Any, Tuple +from abc import ABC, abstractmethod +from asyncio import Task from ..common_neon.eth_proto import Trx as NeonTx from ..common_neon.data import NeonTxExecCfg, NeonEmulatingResult +class IMPExecutor(ABC): + + @abstractmethod + def submit_mp_request(self, mp_reqeust: MPRequest) -> Tuple[int, Task]: + pass + + @abstractmethod + def is_available(self) -> bool: + pass + + @abstractmethod + def on_no_liquidity(self, resource_id: int): + pass + + @abstractmethod + def release_resource(self, resource_id: int): + pass + + +class MPRequestType(IntEnum): + SendTransaction = 0, + GetTrxCount = 1, + Dummy = -1 + + +@dataclass(order=True) +class MPRequest: + req_id: int + type: MPRequestType = field(default=MPRequestType.Dummy) + + +@dataclass +class MPTxRequest(MPRequest): + signature: str = field(compare=False, default=None) + neon_tx: NeonTx = field(compare=False, default=None) + neon_tx_exec_cfg: NeonTxExecCfg = field(compare=False, default=None) + emulating_result: NeonEmulatingResult = field(compare=False, default=None) + _gas_price: int = field(compare=True, default=None) + + def __post_init__(self): + self._gas_price = self.neon_tx.gasPrice + self.type = MPRequestType.SendTransaction + + +@dataclass +class MPPendingTxCountReq(MPRequest): + + sender: str = None + + def __post_init__(self): + self.type = MPRequestType.GetTrxCount + + +class MPResultCode(IntEnum): + Done = 0 + BlockedAccount = 1, + SolanaUnavailable = 2, + NoLiquidity = 3, + Unspecified = 4, + Dummy = -1 + + @dataclass -class MemPoolTxRequest: - neon_tx: NeonTx - neon_tx_exec_cfg: NeonTxExecCfg - emulating_result: NeonEmulatingResult +class MPResult: + code: MPResultCode + data: Any diff --git a/proxy/mempool/mempool_client.py b/proxy/mempool/mempool_client.py index e0be95015..9cb5c9b21 100644 --- a/proxy/mempool/mempool_client.py +++ b/proxy/mempool/mempool_client.py @@ -1,12 +1,25 @@ +from logged_groups import logged_group + from ..common_neon.utils import AddrPickableDataClient -from .mempool_api import MemPoolTxRequest +from .mempool_api import MPTxRequest, MPPendingTxCountReq + +from ..common_neon.eth_proto import Trx as NeonTx +from ..common_neon.data import NeonTxExecCfg, NeonEmulatingResult +@logged_group("neon.Proxy") class MemPoolClient: def __init__(self, host: str, port: int): self._pickable_data_client = AddrPickableDataClient((host, port)) - def send_raw_transaction(self, mempool_tx_request: MemPoolTxRequest): - self._pickable_data_client.send_data(mempool_tx_request) + def send_raw_transaction(self, req_id: int, signature: str, neon_tx: NeonTx, neon_tx_exec_cfg: NeonTxExecCfg, + emulating_result: NeonEmulatingResult): + mempool_tx_request = MPTxRequest(req_id=req_id, signature=signature, neon_tx=neon_tx, + neon_tx_exec_cfg=neon_tx_exec_cfg, emulating_result=emulating_result) + return self._pickable_data_client.send_data(mempool_tx_request) + + def get_pending_tx_count(self, req_id: int, sender: str): + mempool_pending_tx_count_req = MPPendingTxCountReq(req_id=req_id, sender=sender) + return self._pickable_data_client.send_data(mempool_pending_tx_count_req) diff --git a/proxy/mempool/mempool_executor.py b/proxy/mempool/mempool_executor.py new file mode 100644 index 000000000..127ee4b02 --- /dev/null +++ b/proxy/mempool/mempool_executor.py @@ -0,0 +1,65 @@ +import asyncio +import multiprocessing as mp +import socket + +from logged_groups import logged_group, logging_context + +from ..common_neon.solana_interactor import SolanaInteractor +from ..common_neon.config import IConfig +from ..common_neon.utils import PipePickableDataSrv, PickableDataServerUser, Any +from ..common_neon.config import Config +from ..memdb.memdb import MemDB + +from .transaction_sender import NeonTxSender +from .operator_resource_list import OperatorResourceList +from .mempool_api import MPRequest, MPResult, MPResultCode + + +@logged_group("neon.MemPool") +class MPExecutor(mp.Process, PickableDataServerUser): + + def __init__(self, executor_id: int, srv_sock: socket.socket, config: IConfig): + self.info(f"Initialize mempool_executor: {executor_id}") + self._id = executor_id + self._srv_sock = srv_sock + self._config = config + self.info(f"Config: {self._config}") + self._event_loop: asyncio.BaseEventLoop + self._solana: SolanaInteractor + self._db: MemDB + self._pickable_data_srv = None + mp.Process.__init__(self) + + def _init_in_proc(self): + self.info(f"Config: {self._config}") + self._event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._event_loop) + self._pickable_data_srv = PipePickableDataSrv(user=self, srv_sock=self._srv_sock) + self._solana = SolanaInteractor(self._config.get_solana_url()) + self._db = MemDB(self._solana) + + def execute_neon_tx(self, mempool_request: MPRequest): + with logging_context(req_id=mempool_request.req_id, exectr=self._id): + try: + self.execute_neon_tx_impl(mempool_request) + except Exception as err: + self.error(f"Failed to execute neon_tx: {err}") + return MPResult(MPResultCode.Unspecified, None) + return MPResult(MPResultCode.Done, None) + + def execute_neon_tx_impl(self, mempool_tx_cfg: MPRequest): + neon_tx = mempool_tx_cfg.neon_tx + neon_tx_cfg = mempool_tx_cfg.neon_tx_exec_cfg + emulating_result = mempool_tx_cfg.emulating_result + emv_step_count = self._config.get_evm_count() + tx_sender = NeonTxSender(self._db, self._solana, neon_tx, steps=emv_step_count) + with OperatorResourceList(tx_sender): + tx_sender.execute(neon_tx_cfg, emulating_result) + + async def on_data_received(self, data: Any) -> Any: + return self.execute_neon_tx(data) + + def run(self) -> None: + self._config = Config() + self._init_in_proc() + self._event_loop.run_forever() diff --git a/proxy/mempool/mempool_service.py b/proxy/mempool/mempool_service.py index 1381d66ce..8478cfd9f 100644 --- a/proxy/mempool/mempool_service.py +++ b/proxy/mempool/mempool_service.py @@ -1,26 +1,28 @@ from logged_groups import logged_group import asyncio from multiprocessing import Process +from typing import Any from ..common_neon.utils.pickable_data_server import AddrPickableDataSrv, PickableDataServerUser from ..common_neon.config import IConfig -from .mem_pool import MemPool - -from typing import Any +from .mempool import MemPool +from .executor_mng import MPExecutorMng @logged_group("neon.MemPool") -class MemPoolService(PickableDataServerUser): +class MPService(PickableDataServerUser): - MEMPOOL_SERVICE_PORT = 9091 - MEMPOOL_SERVICE_HOST = "0.0.0.0" + MP_SERVICE_PORT = 9091 + MP_SERVICE_HOST = "0.0.0.0" + EXECUTOR_COUNT = 8 def __init__(self, config: IConfig): self.event_loop = asyncio.new_event_loop() asyncio.set_event_loop(self.event_loop) self._mempool_server = None self._mempool = None + self._mp_executor_mng = None self._process = Process(target=self.run) self._config = config @@ -28,10 +30,11 @@ def start(self): self.info("Run until complete") self._process.start() - def on_data_received(self, data: Any) -> Any: - return self._mempool.send_raw_transaction(data) + async def on_data_received(self, data: Any) -> Any: + return await self._mempool.enqueue_mp_request(data) def run(self): - self._mempool_server = AddrPickableDataSrv(user=self, address=(self.MEMPOOL_SERVICE_HOST, self.MEMPOOL_SERVICE_PORT)) - self._mempool = MemPool(self._config) + self._mempool_server = AddrPickableDataSrv(user=self, address=(self.MP_SERVICE_HOST, self.MP_SERVICE_PORT)) + self._mp_executor_mng = MPExecutorMng(self.EXECUTOR_COUNT, self._config) + self._mempool = MemPool(self._mp_executor_mng) self.event_loop.run_forever() diff --git a/proxy/mempool/mempool_tx_executor.py b/proxy/mempool/mempool_tx_executor.py deleted file mode 100644 index eb4f7e23b..000000000 --- a/proxy/mempool/mempool_tx_executor.py +++ /dev/null @@ -1,29 +0,0 @@ -from logged_groups import logged_group - -from ..common_neon.solana_interactor import SolanaInteractor -from ..common_neon.config import IConfig -from ..memdb.memdb import MemDB - -# TODO: NeonTxSender should be moved out from there -from .transaction_sender import NeonTxSender -from .operator_resource_list import OperatorResourceList -from .mempool_api import MemPoolTxRequest - - -@logged_group("neon.MemPool") -class MemPoolTxExecutor: - - def __init__(self, config: IConfig): - - self._solana = SolanaInteractor(config.get_solana_url()) - self._db = MemDB(self._solana) - self._config = config - - def execute_neon_tx(self, mempool_tx_request: MemPoolTxRequest): - neon_tx = mempool_tx_request.neon_tx - neon_tx_cfg = mempool_tx_request.neon_tx_exec_cfg - emulating_result = mempool_tx_request.emulating_result - emv_step_count = self._config.get_evm_count() - tx_sender = NeonTxSender(self._db, self._solana, neon_tx, steps=emv_step_count) - with OperatorResourceList(tx_sender): - tx_sender.execute(neon_tx_cfg, emulating_result) diff --git a/proxy/mempool/neon_tx_stages.py b/proxy/mempool/neon_tx_stages.py index 08b3a408b..308c62c3c 100644 --- a/proxy/mempool/neon_tx_stages.py +++ b/proxy/mempool/neon_tx_stages.py @@ -29,7 +29,7 @@ def build(self): pass -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class NeonCancelTxStage(NeonTxStage, abc.ABC): NAME = 'cancelWithNonce' @@ -78,7 +78,7 @@ def _create_account_with_seed(self): return self.s.builder.create_account_with_seed_instruction(self.sol_account, self._seed, self.balance, self.size) -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class NeonCreateAccountTxStage(NeonTxStage): NAME = 'createNeonAccount' @@ -98,7 +98,7 @@ def build(self): self.tx.add(self._create_account()) -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class NeonCreateERC20TxStage(NeonTxStage, abc.ABC): NAME = 'createERC20Account' @@ -124,7 +124,7 @@ def build(self): self.tx.add(self._create_erc20_account()) -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class NeonCreateContractTxStage(NeonCreateAccountWithSeedStage, abc.ABC): NAME = 'createNeonContract' @@ -150,7 +150,7 @@ def build(self): self.tx.add(self._create_account()) -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class NeonResizeContractTxStage(NeonCreateAccountWithSeedStage, abc.ABC): NAME = 'resizeNeonContract' diff --git a/proxy/mempool/operator_resource_list.py b/proxy/mempool/operator_resource_list.py index 2ee58ab78..c4153c68a 100644 --- a/proxy/mempool/operator_resource_list.py +++ b/proxy/mempool/operator_resource_list.py @@ -42,7 +42,7 @@ def secret_key(self) -> bytes: return self.signer.secret_key() -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class OperatorResourceList: # These variables are global for class, they will be initialized one time _manager = mp.Manager() @@ -282,7 +282,7 @@ def free_resource_info(self): self._free_resource_list.append(resource.idx) -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class NeonCreatePermAccount(NeonCreateAccountWithSeedStage, abc.ABC): NAME = 'createPermAccount' diff --git a/proxy/mempool/transaction_sender.py b/proxy/mempool/transaction_sender.py index 1d233414c..9c33fc9c3 100644 --- a/proxy/mempool/transaction_sender.py +++ b/proxy/mempool/transaction_sender.py @@ -30,7 +30,7 @@ from .operator_resource_list import OperatorResourceInfo -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class NeonTxSender: def __init__(self, db: MemDB, solana: SolanaInteractor, eth_tx: EthTx, steps: int): self._db = db @@ -205,7 +205,7 @@ def done_account_tx_list(self, skip_create_accounts=False): self.create_account_tx.instructions.clear() -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class BaseNeonTxStrategy(metaclass=abc.ABCMeta): NAME = 'UNKNOWN STRATEGY' @@ -259,7 +259,7 @@ def _validate_gas_limit(self): return False -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class SimpleNeonTxSender(SolTxListSender): def __init__(self, strategy: BaseNeonTxStrategy, *args, **kwargs): SolTxListSender.__init__(self, *args, **kwargs) @@ -282,7 +282,7 @@ def _on_post_send(self): raise RuntimeError('Run out of attempts to execute transaction') -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class SimpleNeonTxStrategy(BaseNeonTxStrategy, abc.ABC): NAME = 'CallFromRawEthereumTX' IS_SIMPLE = True @@ -329,7 +329,7 @@ def execute(self) -> (NeonTxResultInfo, [str]): return tx_sender.neon_res, tx_sender.success_sign_list -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class IterativeNeonTxSender(SimpleNeonTxSender): def __init__(self, *args, **kwargs): SimpleNeonTxSender.__init__(self, *args, **kwargs) @@ -443,7 +443,7 @@ def _on_post_send(self): self._tx_list.append(self._strategy.build_tx()) -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class IterativeNeonTxStrategy(BaseNeonTxStrategy, abc.ABC): NAME = 'PartialCallOrContinueFromRawEthereumTX' IS_SIMPLE = False @@ -492,7 +492,7 @@ def execute(self) -> (NeonTxResultInfo, [str]): return tx_sender.neon_res, tx_sender.success_sign_list -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class HolderNeonTxStrategy(IterativeNeonTxStrategy, abc.ABC): NAME = 'ExecuteTrxFromAccountDataIterativeOrContinue' @@ -531,7 +531,7 @@ def _build_preparation_tx_list(self) -> [TransactionWithComputeBudget]: return tx_list -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class NoChainIdNeonTxStrategy(HolderNeonTxStrategy, abc.ABC): NAME = 'ExecuteTrxFromAccountDataIterativeOrContinueNoChainId' diff --git a/proxy/neon_proxy_app.py b/proxy/neon_proxy_app.py index 334c1a039..32f29530f 100644 --- a/proxy/neon_proxy_app.py +++ b/proxy/neon_proxy_app.py @@ -1,5 +1,5 @@ from .proxy import entry_point -from .mempool.mempool_service import MemPoolService +from .mempool.mempool_service import MPService from .statistics_exporter.prometheus_proxy_server import PrometheusProxyServer from .common_neon.config import Config @@ -9,7 +9,7 @@ class NeonProxyApp: def __init__(self): self._config = Config() - self._mempool_service = MemPoolService(self._config) + self._mempool_service = MPService(self._config) def start(self): self._mempool_service.start() diff --git a/proxy/neon_rpc_api_model/neon_rpc_api_model.py b/proxy/neon_rpc_api_model/neon_rpc_api_model.py index 57e6d22f1..01cc51ba7 100644 --- a/proxy/neon_rpc_api_model/neon_rpc_api_model.py +++ b/proxy/neon_rpc_api_model/neon_rpc_api_model.py @@ -6,7 +6,7 @@ from typing import Optional, Union, Tuple import sha3 -from logged_groups import logged_group +from logged_groups import logged_group, LogMng from web3.auto import w3 from ..common_neon.address import EthereumAddress @@ -24,7 +24,7 @@ CHAIN_ID, USE_EARLIEST_BLOCK_IF_0_PASSED, neon_cli, EVM_STEP_COUNT from ..memdb.memdb import MemDB from ..statistics_exporter.proxy_metrics_interface import StatisticsExporter -from ..mempool import MemPoolTxRequest, MemPoolClient, MEMPOOL_SERVICE_HOST, MEMPOOL_SERVICE_PORT +from ..mempool import MemPoolClient, MP_SERVICE_HOST, MP_SERVICE_PORT from .transaction_validator import NeonTxValidator @@ -49,7 +49,7 @@ def __init__(self): self._solana = SolanaInteractor(SOLANA_URL) self._db = MemDB(self._solana) self._stat_exporter: Optional[StatisticsExporter] = None - self._mempool_client = MemPoolClient(MEMPOOL_SERVICE_HOST, MEMPOOL_SERVICE_PORT) + self._mempool_client = MemPoolClient(MP_SERVICE_HOST, MP_SERVICE_PORT) if PP_SOLANA_URL == SOLANA_URL: self.gas_price_calculator = GasPriceCalculator(self._solana, PYTH_MAPPING_ACCOUNT) @@ -214,8 +214,8 @@ def eth_getBalance(self, account: str, tag: str) -> str: return hex(0) return hex(neon_account_info.balance) - except (Exception,): - # self.debug(f"eth_getBalance: Can't get account info: {err}") + except (Exception,) as err: + self.debug(f"eth_getBalance: Can't get account info: {err}") return hex(0) def eth_getLogs(self, obj): @@ -383,11 +383,20 @@ def eth_call(self, obj: dict, tag: str) -> str: def eth_getTransactionCount(self, account: str, tag: str) -> str: self._validate_block_tag(tag) - account = self._normalize_account(account) + account = self._normalize_account(account).lower() try: + self.debug(f"Get transaction count. Account: {account}, tag: {tag}") + pending_trx_count = 0 + if tag == "pending": + req_id = LogMng.get_logging_context().get("req_id") + pending_trx_count = self._mempool_client.get_pending_tx_count(req_id=req_id, sender=account) + self.debug(f"Pending tx count for: {account} - is: {pending_trx_count}") + neon_account_info = self._solana.get_neon_account_info(account) - return hex(neon_account_info.trx_count) + trx_count = neon_account_info.trx_count + pending_trx_count + + return hex(trx_count) except (Exception,): # self.debug(f"eth_getTransactionCount: Can't get account info: {err}") return hex(0) @@ -480,17 +489,14 @@ def eth_sendRawTransaction(self, rawTrx: str) -> str: try: neon_tx_cfg, emulating_result = self.precheck(trx) - # tx_sender = NeonTxSender(self._db, self._solana, trx, steps=EVM_STEP_COUNT) - # with OperatorResourceList(tx_sender): - # tx_sender.execute(neon_tx_cfg) - self._stat_tx_success() - mempool_tx_request = MemPoolTxRequest(neon_tx=trx, - neon_tx_exec_cfg=neon_tx_cfg, - emulating_result=emulating_result) + req_id = LogMng.get_logging_context().get("req_id") - if not self._mempool_client.send_raw_transaction(mempool_tx_request): - raise Exception("Failed to pass neon_tx into MemPool") + self._mempool_client.send_raw_transaction(req_id=req_id, + signature=eth_signature, + neon_tx=trx, + neon_tx_exec_cfg=neon_tx_cfg, + emulating_result=emulating_result) return eth_signature except PendingTxError as err: diff --git a/proxy/testing/test_eth_getLogs.py b/proxy/testing/test_eth_getLogs.py index 12795d8d2..a92e212d7 100644 --- a/proxy/testing/test_eth_getLogs.py +++ b/proxy/testing/test_eth_getLogs.py @@ -46,6 +46,7 @@ class Test_eth_getLogs(unittest.TestCase): + @classmethod def setUpClass(cls): print("\n\n") @@ -102,10 +103,9 @@ def commit_transactions(self): self.commit_two_event_trx(self, 5, 6) self.commit_no_event_trx(self, 7, 8) self.commit_no_event_trx(self, 9, 0) - pass def commit_one_event_trx(self, x, y) -> None: - print("\ncommit_one_event_trx") + print(f"\ncommit_one_event_trx. x: {x}, y: {y}") right_nonce = proxy.eth.get_transaction_count(proxy.eth.default_account) trx_store = self.storage_contract.functions.addReturnEvent(x, y).buildTransaction({'nonce': right_nonce}) trx_store_signed = proxy.eth.account.sign_transaction(trx_store, eth_account.key) @@ -120,7 +120,7 @@ def commit_one_event_trx(self, x, y) -> None: self.topics.append(topic.hex()) def commit_two_event_trx(self, x, y) -> None: - print("\ncommit_two_event_trx") + print(f"\ncommit_two_event_trx. x: {x}, y: {y}") right_nonce = proxy.eth.get_transaction_count(proxy.eth.default_account) trx_store = self.storage_contract.functions.addReturnEventTwice(x, y).buildTransaction({'nonce': right_nonce}) trx_store_signed = proxy.eth.account.sign_transaction(trx_store, eth_account.key) @@ -146,7 +146,6 @@ def commit_no_event_trx(self, x, y) -> None: self.block_hashes_no_event.append(trx_store_receipt['blockHash'].hex()) self.block_numbers_no_event.append(hex(trx_store_receipt['blockNumber'])) - def test_get_logs_by_blockHash(self): print("\ntest_get_logs_by_blockHash") receipts = proxy.eth.get_logs({'blockHash': self.block_hashes[0]}) @@ -155,24 +154,25 @@ def test_get_logs_by_blockHash(self): def test_get_no_logs_by_blockHash(self): print("\ntest_get_no_logs_by_blockHash") - receipts = proxy.eth.get_logs({'blockHash': self.block_hashes_no_event[0]}) + receipts = proxy.eth.get_logs({'blockHash': self.block_hashes_no_event[0], + 'address': self.storage_contract.address}) print('receipts: ', receipts) self.assertEqual(len(receipts), 0) def test_get_logs_by_fromBlock(self): - print("\ntest_get_logs_by_fromBlock") - receipts = proxy.eth.get_logs({'fromBlock': self.block_numbers[2]}) + from_block = self.block_numbers[2] + print(f"\ntest_get_logs_by_fromBlock: {from_block}, by storage contract address: {self.storage_contract.address}") + receipts = proxy.eth.get_logs({'fromBlock': from_block, + 'address': self.storage_contract.address}) print('receipts: ', receipts) self.assertEqual(len(receipts), 4) def test_get_logs_complex_request(self): print("\ntest_get_logs_complex_request") - receipts = proxy.eth.get_logs({ - 'fromBlock': 0, - 'toBlock': 'latest', - 'address': self.storage_contract.address, - 'topics': self.topics, - }) + receipts = proxy.eth.get_logs({'fromBlock': 0, + 'toBlock': 'latest', + 'address': self.storage_contract.address, + 'topics': self.topics}) print('receipts: ', receipts) self.assertEqual(len(receipts), 6) @@ -182,5 +182,6 @@ def test_get_logs_by_address(self): print('receipts: ', receipts) self.assertEqual(len(receipts), 6) + if __name__ == '__main__': unittest.main() diff --git a/proxy/testing/test_neon_tx_sender.py b/proxy/testing/test_neon_tx_sender.py index 7154cd3e3..238ae92d0 100644 --- a/proxy/testing/test_neon_tx_sender.py +++ b/proxy/testing/test_neon_tx_sender.py @@ -44,10 +44,10 @@ def test_01_validate_execution_when_not_enough_sols(self): self._resource_list._min_operator_balance_to_warn.side_effect = [1_049_000_000 * 1_000_000_000 * 1_000_000_000 * 2, 1_000_000_000 * 2] self._resource_list._min_operator_balance_to_err.side_effect = [1_049_000_000 * 1_000_000_000 * 1_000_000_000, 1_000_000_000] - with self.assertLogs('neon', level='ERROR') as logs: + with self.assertLogs('neon.MemPool', level='ERROR') as logs: with self._resource_list: print('logs.output:', str(logs.output)) - self.assertRegex(str(logs.output), 'ERROR:neon.Proxy:Operator account [A-Za-z0-9]{40,}:[0-9]+ has NOT enough SOLs; balance = [0-9]+; min_operator_balance_to_err = 1049000000000000000000000000') + self.assertRegex(str(logs.output), 'ERROR:neon.MemPool:Operator account [A-Za-z0-9]{40,}:[0-9]+ has NOT enough SOLs; balance = [0-9]+; min_operator_balance_to_err = 1049000000000000000000000000') # @unittest.skip("a.i.") def test_02_validate_warning_when_little_sols(self): @@ -60,10 +60,10 @@ def test_02_validate_warning_when_little_sols(self): self._resource_list._min_operator_balance_to_warn.side_effect = [1_049_000_000 * 1_000_000_000 * 1_000_000_000, 1_000_000_000 * 2] self._resource_list._min_operator_balance_to_err.side_effect = [1_049_049_000, 1_000_000_000] - with self.assertLogs('neon', level='WARNING') as logs: + with self.assertLogs('neon.MemPool', level='WARNING') as logs: with self._resource_list: print('logs.output:', str(logs.output)) - self.assertRegex(str(logs.output), 'WARNING:neon.Proxy:Operator account [A-Za-z0-9]{40,}:[0-9]+ SOLs are running out; balance = [0-9]+; min_operator_balance_to_warn = 1049000000000000000000000000; min_operator_balance_to_err = 1049049000;') + self.assertRegex(str(logs.output), 'WARNING:neon.MemPool:Operator account [A-Za-z0-9]{40,}:[0-9]+ SOLs are running out; balance = [0-9]+; min_operator_balance_to_warn = 1049000000000000000000000000; min_operator_balance_to_err = 1049049000;') # @unittest.skip("a.i.") def test_03_validate_execution_when_not_enough_sols_for_all_operator_accounts(self): @@ -78,11 +78,11 @@ def test_03_validate_execution_when_not_enough_sols_for_all_operator_accounts(se self._resource_list._min_operator_balance_to_warn.return_value = 1_049_000_000 * 1_000_000_000 * 1_000_000_000 * 2 self._resource_list._min_operator_balance_to_err.return_value = 1_049_000_000 * 1_000_000_000 * 1_000_000_000 - with self.assertLogs('neon', level='ERROR') as logs: + with self.assertLogs('neon.MemPool', level='ERROR') as logs: with self.assertRaises(RuntimeError, msg='Operator has NO resources!'): with self._resource_list: pass print('logs.output:', str(logs.output)) - self.assertRegex(str(logs.output), 'ERROR:neon.Proxy:Operator account [A-Za-z0-9]{40,}:[0-9]+ has NOT enough SOLs; balance = [0-9]+; min_operator_balance_to_err = 1049000000000000000000000000') + self.assertRegex(str(logs.output), 'ERROR:neon.MemPool:Operator account [A-Za-z0-9]{40,}:[0-9]+ has NOT enough SOLs; balance = [0-9]+; min_operator_balance_to_err = 1049000000000000000000000000') diff --git a/requirements.txt b/requirements.txt index 56809c7b1..7fce90475 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,4 @@ ethereum py-solc-x==1.1.0 flask prometheus_client==0.13.1 -git+https://github.com/neonlabsorg/python-logged-groups.git@2.1.4 +git+https://github.com/neonlabsorg/python-logged-groups.git@2.1.5