diff --git a/proxy/__main__.py b/proxy/__main__.py index 809cb80f4..ed992ed24 100644 --- a/proxy/__main__.py +++ b/proxy/__main__.py @@ -9,8 +9,9 @@ :license: BSD, see LICENSE for more details. """ -from .proxy import entry_point + import os +from .neon_proxy_app import NeonProxyApp from .indexer.indexer_app import run_indexer @@ -25,8 +26,9 @@ print("Will run in indexer mode") run_indexer(solana_url) else: - from .statistics_exporter.prometheus_proxy_server import PrometheusProxyServer - PrometheusProxyServer() + neon_proxy_app = NeonProxyApp() + neon_proxy_app.start() + + + - print("Will run in proxy mode") - entry_point() diff --git a/proxy/common_neon/__init__.py b/proxy/common_neon/__init__.py index e69de29bb..ca2b699e8 100644 --- a/proxy/common_neon/__init__.py +++ b/proxy/common_neon/__init__.py @@ -0,0 +1 @@ +from .types import Result diff --git a/proxy/common_neon/types.py b/proxy/common_neon/types.py new file mode 100644 index 000000000..433444a30 --- /dev/null +++ b/proxy/common_neon/types.py @@ -0,0 +1,9 @@ +class Result: + def __init__(self, reason: str = None): + self._reason = reason + + def __bool__(self) -> bool: + return self._reason is None + + def __str__(self) -> str: + return self._reason if self._reason is not None else "" diff --git a/proxy/common_neon/utils/__init__.py b/proxy/common_neon/utils/__init__.py new file mode 100644 index 000000000..d45ecaed9 --- /dev/null +++ b/proxy/common_neon/utils/__init__.py @@ -0,0 +1,3 @@ +from .queue_based_service import QueueBasedService, QueueBasedServiceClient, ServiceInvocation +from .utils import * + diff --git a/proxy/common_neon/utils/queue_based_service.py b/proxy/common_neon/utils/queue_based_service.py new file mode 100644 index 000000000..2afb45613 --- /dev/null +++ b/proxy/common_neon/utils/queue_based_service.py @@ -0,0 +1,117 @@ +import abc +import multiprocessing as mp +import os +import queue +import signal + +from multiprocessing.managers import BaseManager +from dataclasses import dataclass, astuple, field +from typing import Tuple, Dict, Any + +from logged_groups import logged_group + +from ..types import Result + + +@dataclass +class ServiceInvocation: + method_name: str = None + args: Tuple[Any] = field(default_factory=tuple) + kwargs: Dict[str, Any] = field(default_factory=dict) + + +@logged_group("neon") +class QueueBasedServiceClient: + + def __init__(self, host: str, port: int): + class MemPoolQueueManager(BaseManager): + pass + + MemPoolQueueManager.register('get_queue') + queue_manager = MemPoolQueueManager(address=(host, port), authkey=b'abracadabra') + queue_manager.connect() + self._queue = queue_manager.get_queue() + + def invoke(self, method_name, *args, **kwargs) -> Result: + try: + self._invoke_impl(method_name, *args, **kwargs) + except queue.Full: + self.error(f"Failed to invoke the method: {method_name}, queue is full") + return Result("Mempool queue full") + return Result() + + def _invoke_impl(self, method_name, *args, **kwargs): + invocation = ServiceInvocation(method_name=method_name, args=args, kwargs=kwargs) + self._queue.put(invocation) + + +@logged_group("neon") +class QueueBasedService(abc.ABC): + + QUEUE_TIMEOUT_SEC = 0.4 + BREAK_PROC_INVOCATION = 0 + JOIN_PROC_TIMEOUT_SEC = 5 + + def __init__(self, *, port: int, is_background: bool): + self._port = port + self._is_back_ground = is_background + self._timeout = self.QUEUE_TIMEOUT_SEC + + class MemPoolQueueManager(BaseManager): + pass + + self._queue = mp.Queue() + MemPoolQueueManager.register("get_queue", callable=lambda: self._queue) + self._queue_manager = MemPoolQueueManager(address=('', port), authkey=b'abracadabra') + self._mempool_server = self._queue_manager.get_server() + self._mempool_server_process = mp.Process(target=self._mempool_server.serve_forever, name="mempool_listen_proc") + self._queue_process = mp.Process(target=self.run, name="mempool_queue_proc") + + pid = os.getpid() + signal.signal(signal.SIGINT, lambda sif, frame: self.finish() if os.getpid() == pid else 0) + + def start(self): + self.info(f"Starting queue server: {self._port}") + self._mempool_server_process.start() + self._queue_process.start() + if not self._is_back_ground: + self._queue_process.join() + + def run(self): + self.service_process_init() + while True: + try: + if not self._run_impl(): + break + except queue.Empty: + self.do_extras() + + def _run_impl(self) -> bool: + invocation = self._queue.get(block=True, timeout=self._timeout) + if invocation == self.BREAK_PROC_INVOCATION: + return False + self.dispatch(invocation) + return True + + def dispatch(self, invocation: ServiceInvocation): + method_name, args, kwargs = astuple(invocation) + handler = getattr(self, method_name, None) + if handler is None: + raise NotImplementedError(f"Process has no handler for {handler}") + handler(*args, **kwargs) + + def finish(self): + self.info("Finishing the queue and listening processes") + self._mempool_server_process.terminate() + if not self._queue_process.is_alive(): + return + self._queue.put_nowait(self.BREAK_PROC_INVOCATION) + self._queue_process.join(timeout=self.JOIN_PROC_TIMEOUT_SEC) + + @abc.abstractmethod + def do_extras(self): + assert "To be implemented in derived class" + + @abc.abstractmethod + def service_process_init(self): + assert "To be implemented in derived class" diff --git a/proxy/common_neon/utils.py b/proxy/common_neon/utils/utils.py similarity index 98% rename from proxy/common_neon/utils.py rename to proxy/common_neon/utils/utils.py index 56e4f3e3d..db837cbab 100644 --- a/proxy/common_neon/utils.py +++ b/proxy/common_neon/utils/utils.py @@ -6,9 +6,10 @@ from eth_utils import big_endian_to_int -from ..environment import EVM_LOADER_ID +#TODO: move it out from here +from ...environment import EVM_LOADER_ID -from ..common_neon.eth_proto import Trx as EthTx +from ..eth_proto import Trx as EthTx def str_fmt_object(obj): diff --git a/proxy/mempool/__init__.py b/proxy/mempool/__init__.py new file mode 100644 index 000000000..adc9b5cae --- /dev/null +++ b/proxy/mempool/__init__.py @@ -0,0 +1,3 @@ +from .mempool_service import MemPoolService +from .mempool_client import MemPoolClient +from .mem_pool import MemPool diff --git a/proxy/mempool/mem_pool.py b/proxy/mempool/mem_pool.py new file mode 100644 index 000000000..774ac02f7 --- /dev/null +++ b/proxy/mempool/mem_pool.py @@ -0,0 +1,35 @@ +from logged_groups import logged_group +from multiprocessing import Pool + + +@logged_group("neon.MemPool") +class MemPool: + + POOL_PROC_COUNT = 20 + + def __init__(self): + self._pool = Pool(processes=self.POOL_PROC_COUNT) + + def on_eth_send_raw_transaction(self, *, eth_trx_hash): + self._pool.apply_async(func=self._on_eth_send_raw_transaction_impl, args=(eth_trx_hash,), + callback=self.on_eth_send_raw_transaction_callback, error_callback=self.error_callback) + + def error_callback(self, error): + self.error("Failed to invoke on worker process: ", error) + + def on_eth_send_raw_transaction_callback(self, result): + pass + + def _on_eth_send_raw_transaction_impl(self, eth_trx_hash): + self.debug(f"Transaction is being processed on the worker: {eth_trx_hash}") + + def do_extras(self): + pass + + def __getstate__(self): + self_dict = self.__dict__.copy() + del self_dict['_pool'] + return self_dict + + def __setstate__(self, state): + self.__dict__.update(state) diff --git a/proxy/mempool/mempool_client.py b/proxy/mempool/mempool_client.py new file mode 100644 index 000000000..958b7ef09 --- /dev/null +++ b/proxy/mempool/mempool_client.py @@ -0,0 +1,20 @@ +from logged_groups import logged_group + +from ..common_neon.utils import QueueBasedServiceClient +from ..common_neon import Result + +from . import MemPoolService + + +@logged_group("neon.Proxy") +class MemPoolClient(QueueBasedServiceClient): + + MEM_POOL_SERVICE_HOST = "127.0.0.1" + + def __init__(self): + port, host = (MemPoolService.MEM_POOL_SERVICE_PORT, self.MEM_POOL_SERVICE_HOST) + self.info(f"Initialize MemPoolClient connecting to: {port} at: {host}") + QueueBasedServiceClient.__init__(self, host, port) + + def on_eth_send_raw_transaction(self, eth_trx_signature) -> Result: + return self.invoke("on_eth_send_raw_transaction", eth_trx_hash=eth_trx_signature) diff --git a/proxy/mempool/mempool_service.py b/proxy/mempool/mempool_service.py new file mode 100644 index 000000000..75c28d697 --- /dev/null +++ b/proxy/mempool/mempool_service.py @@ -0,0 +1,26 @@ +from logged_groups import logged_group + +from ..common_neon.utils import QueueBasedService + +from .mem_pool import MemPool + + +@logged_group("neon.MemPool") +class MemPoolService(QueueBasedService): + + MEM_POOL_SERVICE_PORT = 9091 + + def __init__(self, *, is_background: bool): + QueueBasedService.__init__(self, port=self.MEM_POOL_SERVICE_PORT, is_background=is_background) + self._mem_pool = None + + def on_eth_send_raw_transaction(self, *, eth_trx_hash): + self._mem_pool.on_eth_send_raw_transaction(eth_trx_hash=eth_trx_hash) + + # QueueBasedService abstracts + + def service_process_init(self): + self._mem_pool = MemPool() + + def do_extras(self): + self._mem_pool.do_extras() diff --git a/proxy/neon_proxy_app.py b/proxy/neon_proxy_app.py new file mode 100644 index 000000000..c7a1c636e --- /dev/null +++ b/proxy/neon_proxy_app.py @@ -0,0 +1,14 @@ +from .proxy import entry_point +from .mempool.mempool_service import MemPoolService +from .statistics_exporter.prometheus_proxy_server import PrometheusProxyServer + + +class NeonProxyApp: + + def __init__(self): + self._mempool_service = MemPoolService(is_background=True) + + def start(self): + PrometheusProxyServer() + self._mempool_service.start() + entry_point() diff --git a/proxy/plugin/solana_rest_api.py b/proxy/plugin/solana_rest_api.py index 8d6fb3523..924eeaba7 100644 --- a/proxy/plugin/solana_rest_api.py +++ b/proxy/plugin/solana_rest_api.py @@ -17,14 +17,14 @@ import sha3 from logged_groups import logged_group, logging_context -from typing import Optional, Union +from typing import Union from ..common.utils import build_http_response from ..http.codes import httpStatusCodes from ..http.parser import HttpParser from ..http.websocket import WebsocketFrame from ..http.server import HttpWebServerBasePlugin, httpProtocolTypes -from typing import Dict, List, Tuple, Optional +from typing import List, Tuple, Optional from ..common_neon.transaction_sender import NeonTxSender from ..common_neon.solana_interactor import SolanaInteractor @@ -35,10 +35,10 @@ from ..common_neon.estimate import GasEstimate from ..common_neon.utils import SolanaBlockInfo from ..common_neon.keys_storage import KeyStorage +from ..mempool import MemPoolClient from ..environment import SOLANA_URL, PP_SOLANA_URL, PYTH_MAPPING_ACCOUNT, EVM_STEP_COUNT, CHAIN_ID, ENABLE_PRIVATE_API from ..environment import NEON_EVM_VERSION, NEON_EVM_REVISION from ..environment import neon_cli -from ..environment import get_solana_accounts from ..memdb.memdb import MemDB from .gas_price_calculator import GasPriceCalculator from ..common_neon.eth_proto import Trx as EthTrx @@ -61,6 +61,7 @@ class EthereumModel: def __init__(self): self._solana = SolanaInteractor(SOLANA_URL) self._db = MemDB(self._solana) + self._mempool_client = MemPoolClient() if PP_SOLANA_URL == SOLANA_URL: self.gas_price_calculator = GasPriceCalculator(self._solana, PYTH_MAPPING_ACCOUNT) @@ -457,6 +458,7 @@ def eth_sendRawTransaction(self, rawTrx: str) -> str: tx_sender = NeonTxSender(self._db, self._solana, trx, steps=EVM_STEP_COUNT) tx_sender.execute() self._stat_tx_success() + self._mempool_client.on_eth_send_raw_transaction(eth_signature) return eth_signature except PendingTxError as err: