From f42284864d207406224671964167ef70f06c101c Mon Sep 17 00:00:00 2001 From: rozhkovdmitrii Date: Sat, 9 Apr 2022 19:41:35 +0400 Subject: [PATCH 01/15] Froze decision --- TestQueueServer.py | 12 ++++++ proxy/__main__.py | 30 +++++++-------- proxy/common_neon/base_process.py | 61 +++++++++++++++++++++++++++++++ proxy/mempool/MemPollService.py | 28 ++++++++++++++ proxy/neon_proxy_app.py | 18 +++++++++ 5 files changed, 134 insertions(+), 15 deletions(-) create mode 100644 TestQueueServer.py create mode 100644 proxy/common_neon/base_process.py create mode 100644 proxy/mempool/MemPollService.py create mode 100644 proxy/neon_proxy_app.py diff --git a/TestQueueServer.py b/TestQueueServer.py new file mode 100644 index 000000000..3931617f3 --- /dev/null +++ b/TestQueueServer.py @@ -0,0 +1,12 @@ +from multiprocessing.managers import BaseManager +from queue import Queue +queue = Queue() + +class QueueManager(BaseManager): + pass + +QueueManager.register('get_queue', callable=lambda:queue) + +m = QueueManager(address=('', 9091), authkey=b'abracadabra') +s = m.get_server() +s.serve_forever() diff --git a/proxy/__main__.py b/proxy/__main__.py index 7f2f56788..6b42261d5 100644 --- a/proxy/__main__.py +++ b/proxy/__main__.py @@ -9,24 +9,24 @@ :license: BSD, see LICENSE for more details. """ -from .proxy import entry_point -import os -from .indexer.indexer import run_indexer +import os +# from .indexer.indexer import run_indexer +from .neon_proxy_app import NeonProxyApp if __name__ == '__main__': - solana_url = os.environ['SOLANA_URL'] - evm_loader_id = os.environ['EVM_LOADER'] - print(f"Will run with SOLANA_URL={solana_url}; EVM_LOADER={evm_loader_id}") + # solana_url = os.environ['SOLANA_URL'] + # evm_loader_id = os.environ['EVM_LOADER'] + # print(f"Will run with SOLANA_URL={solana_url}; EVM_LOADER={evm_loader_id}") + # + # indexer_mode = os.environ.get('INDEXER_MODE', 'False').lower() in [1, 'true', 'True'] + # + # if indexer_mode: + # print("Will run in indexer mode") + # run_indexer(solana_url) + # else: + neon_proxy_app = NeonProxyApp() + neon_proxy_app.start() - indexer_mode = os.environ.get('INDEXER_MODE', 'False').lower() in [1, 'true', 'True'] - if indexer_mode: - print("Will run in indexer mode") - run_indexer(solana_url) - else: - from .statistics_exporter.prometheus_proxy_server import PrometheusProxyServer - PrometheusProxyServer() - print("Will run in proxy mode") - entry_point() diff --git a/proxy/common_neon/base_process.py b/proxy/common_neon/base_process.py new file mode 100644 index 000000000..187eb2f07 --- /dev/null +++ b/proxy/common_neon/base_process.py @@ -0,0 +1,61 @@ +import multiprocessing as mp +import queue +from dataclasses import dataclass, astuple +from typing import Tuple, Dict, Any +from abc import ABC, abstractmethod + + +@dataclass +class Invocation: + handler: str + args: Tuple[Any] + kvargs: Dict[str, Any] + + +class BaseProcess(mp.Process): + """A process backed by an internal queue for simple one-way message passing""" + + QUEUE_TIMEOUT_SEC = 1 + BREAK_PROC_INVOCATION = 0 + + def __init__(self, *args, **kwargs): + self.__queue = mp.Queue + super().__init__(*args, **kwargs) + self._queue = mp.Queue() + self._timeout = BaseProcess.QUEUE_TIMEOUT_SEC + + def send(self, handler, *args, **kvargs): + """Puts the event and args as a `Msg` on the queue""" + invocation = Invocation(handler=handler, args=args, kvargs=kvargs) + self._queue.put(invocation) + + def finish(self): + self._queue.put(BaseProcess.BREAK_PROC_INVOCATION) + + def dispatch(self, invocation: Invocation): + handler, args, kvargs = astuple(invocation) + + handler = getattr(self, handler, None) + if handler is None: + raise NotImplementedError(f"Process has no handler for {handler}") + + handler(*args, **kvargs) + + def run(self): + while True: + try: + if not self._run_impl(): + break + except queue.Empty: + self.do_extras() + + def _run_impl(self) -> bool: + invocation = self._queue.get(block=True, timeout=self._timeout) + if invocation == BaseProcess.BREAK_PROC_INVOCATION: + return False + self.dispatch(invocation) + return True + + @abstractmethod + def do_extras(self): + assert "Should be implemented" diff --git a/proxy/mempool/MemPollService.py b/proxy/mempool/MemPollService.py new file mode 100644 index 000000000..818fc9f69 --- /dev/null +++ b/proxy/mempool/MemPollService.py @@ -0,0 +1,28 @@ +from ..common_neon.base_process import BaseProcess +from logged_groups import logged_group +from multiprocessing.managers import BaseManager +import multiprocessing as mp + + +@logged_group("neon.Proxy") +class MemPoolService: + + def __init__(self): + self._queue = mp.Queue() + + class MemPoolQueueManager(BaseManager): + pass + MemPoolQueueManager.register("get_queue", callable=lambda: self._queue) + self.queue_manager = MemPoolQueueManager(address=('', 9091), authkey=b'abracadabra') + + def start(self): + self.debug("Starting queue server") + mempool_server = self.queue_manager.get_server() + mempool_server.serve_forever() + + + # def on_eth_send_raw_transaction(self, trx): + # self.debug(f"Got raw transaction to send: {trx}") + # + # def do_extras(self): + # self.debug("Do extras") diff --git a/proxy/neon_proxy_app.py b/proxy/neon_proxy_app.py new file mode 100644 index 000000000..f8764000d --- /dev/null +++ b/proxy/neon_proxy_app.py @@ -0,0 +1,18 @@ +# from .proxy import entry_point +from .mempool.MemPollService import MemPoolService +# from .statistics_exporter.prometheus_proxy_server import PrometheusProxyServer + + +class NeonProxyApp: + + def __init__(self): + self._mem_pool_service = MemPoolService() + + def start(self): + # PrometheusProxyServer() + self._mem_pool_service.start() + # entry_point() + + def __del__(self): + self._mem_pool_service.finish() + self._mem_pool_service.join(timeout=5) From 97190e9d665f04e9138f0f74dee6383a1207a715 Mon Sep 17 00:00:00 2001 From: rozhkovdmitrii Date: Sun, 10 Apr 2022 17:35:32 +0400 Subject: [PATCH 02/15] Stable version of Mempool service --- mem_pool_service.py | 8 ++ proxy/__main__.py | 27 +++--- .../common_neon/utils/queue_based_service.py | 97 +++++++++++++++++++ proxy/common_neon/{ => utils}/utils.py | 5 +- proxy/mempool/MemPollService.py | 28 ------ proxy/mempool/__init__.py | 3 + proxy/mempool/mem_pool.py | 34 +++++++ proxy/mempool/mempool_client.py | 16 +++ proxy/mempool/mempool_service.py | 25 +++++ proxy/neon_proxy_app.py | 23 ++--- proxy/plugin/solana_rest_api.py | 9 +- 11 files changed, 218 insertions(+), 57 deletions(-) create mode 100644 mem_pool_service.py create mode 100644 proxy/common_neon/utils/queue_based_service.py rename proxy/common_neon/{ => utils}/utils.py (98%) delete mode 100644 proxy/mempool/MemPollService.py create mode 100644 proxy/mempool/__init__.py create mode 100644 proxy/mempool/mem_pool.py create mode 100644 proxy/mempool/mempool_client.py create mode 100644 proxy/mempool/mempool_service.py diff --git a/mem_pool_service.py b/mem_pool_service.py new file mode 100644 index 000000000..633a248cf --- /dev/null +++ b/mem_pool_service.py @@ -0,0 +1,8 @@ +from proxy.mempool.mempool_service import MemPoolService + +mempool_service = MemPoolService() + +mempool_service.start() +mempool_service.wait() + + diff --git a/proxy/__main__.py b/proxy/__main__.py index 6b42261d5..58c4d259f 100644 --- a/proxy/__main__.py +++ b/proxy/__main__.py @@ -11,22 +11,23 @@ import os -# from .indexer.indexer import run_indexer +from .indexer.indexer import run_indexer from .neon_proxy_app import NeonProxyApp if __name__ == '__main__': - # solana_url = os.environ['SOLANA_URL'] - # evm_loader_id = os.environ['EVM_LOADER'] - # print(f"Will run with SOLANA_URL={solana_url}; EVM_LOADER={evm_loader_id}") - # - # indexer_mode = os.environ.get('INDEXER_MODE', 'False').lower() in [1, 'true', 'True'] - # - # if indexer_mode: - # print("Will run in indexer mode") - # run_indexer(solana_url) - # else: - neon_proxy_app = NeonProxyApp() - neon_proxy_app.start() + solana_url = os.environ['SOLANA_URL'] + evm_loader_id = os.environ['EVM_LOADER'] + print(f"Will run with SOLANA_URL={solana_url}; EVM_LOADER={evm_loader_id}") + + indexer_mode = os.environ.get('INDEXER_MODE', 'False').lower() in [1, 'true', 'True'] + + if indexer_mode: + print("Will run in indexer mode") + run_indexer(solana_url) + else: + neon_proxy_app = NeonProxyApp() + neon_proxy_app.start() + diff --git a/proxy/common_neon/utils/queue_based_service.py b/proxy/common_neon/utils/queue_based_service.py new file mode 100644 index 000000000..b2550ef2b --- /dev/null +++ b/proxy/common_neon/utils/queue_based_service.py @@ -0,0 +1,97 @@ +import abc +import multiprocessing as mp +from multiprocessing.managers import BaseManager +from dataclasses import dataclass, astuple, field +from typing import Tuple, Dict, Any + +from logged_groups import logged_group + + +@dataclass +class ServiceInvocation: + method_name: str = None + args: Tuple[Any] = field(default_factory=tuple) + kwargs: Dict[str, Any] = field(default_factory=dict) + + +class QueueBasedServiceClient: + + def __init__(self, host: str, port: int): + class MemPoolQueueManager(BaseManager): + pass + + MemPoolQueueManager.register('get_queue') + queue_manager = MemPoolQueueManager(address=(host, port), authkey=b'abracadabra') + queue_manager.connect() + self._queue = queue_manager.get_queue() + + def invoke(self, method_name, *args, **kwargs): + invocation = ServiceInvocation(method_name=method_name, args=args, kwargs=kwargs) + + self._queue.put(invocation) + + +@logged_group("neon") +class QueueBasedService(abc.ABC): + + QUEUE_TIMEOUT_SEC = 10 + BREAK_PROC_INVOCATION = 0 + JOIN_PROC_TIMEOUT_SEC = 5 + + def __init__(self, port: int): + self._queue = mp.Queue() + self._port = port + + class MemPoolQueueManager(BaseManager): + pass + + MemPoolQueueManager.register("get_queue", callable=lambda: self._queue) + self._queue_manager = MemPoolQueueManager(address=('', port), authkey=b'abracadabra') + self._mempool_server = self._queue_manager.get_server() + self._mempool_server_process = mp.Process(target=self._mempool_server.serve_forever) + self._timeout = self.QUEUE_TIMEOUT_SEC + + def start(self): + self.info(f"Starting queue server: {self._port}") + self._mempool_server_process.start() + self.run() + + def run(self): + while True: + try: + if not self._run_impl(): + break + except BaseException as e: + self.do_extras() + self.info("Processing has been finished") + + def _run_impl(self) -> bool: + + invocation = self._queue.get(block=True, timeout=self._timeout) + if invocation == self.BREAK_PROC_INVOCATION: + return False + self.dispatch(invocation) + return True + + def dispatch(self, invocation: ServiceInvocation): + method_name, args, kwargs = astuple(invocation) + handler = getattr(self, method_name, None) + if handler is None: + raise NotImplementedError(f"Process has no handler for {handler}") + handler(*args, **kwargs) + + def finish(self): + self._queue.put(self.BREAK_PROC_INVOCATION) + # self._mempool_queue_process.join(timeout=self.JOIN_PROC_TIMEOUT_SEC) + # self._mempool_service_process.join(timeout=self.JOIN_PROC_TIMEOUT_SEC) + + @abc.abstractmethod + def do_extras(self): + assert "To be implemented in derived class" + + def wait(self): + self._mempool_queue_process.join() + self._mempool_service_process.join() + + def __del__(self): + self.finish() diff --git a/proxy/common_neon/utils.py b/proxy/common_neon/utils/utils.py similarity index 98% rename from proxy/common_neon/utils.py rename to proxy/common_neon/utils/utils.py index 56e4f3e3d..d25960285 100644 --- a/proxy/common_neon/utils.py +++ b/proxy/common_neon/utils/utils.py @@ -6,9 +6,10 @@ from eth_utils import big_endian_to_int -from ..environment import EVM_LOADER_ID +#TODO: move it out from here +from ...environment import EVM_LOADER_ID -from ..common_neon.eth_proto import Trx as EthTx +from ...common_neon.eth_proto import Trx as EthTx def str_fmt_object(obj): diff --git a/proxy/mempool/MemPollService.py b/proxy/mempool/MemPollService.py deleted file mode 100644 index 818fc9f69..000000000 --- a/proxy/mempool/MemPollService.py +++ /dev/null @@ -1,28 +0,0 @@ -from ..common_neon.base_process import BaseProcess -from logged_groups import logged_group -from multiprocessing.managers import BaseManager -import multiprocessing as mp - - -@logged_group("neon.Proxy") -class MemPoolService: - - def __init__(self): - self._queue = mp.Queue() - - class MemPoolQueueManager(BaseManager): - pass - MemPoolQueueManager.register("get_queue", callable=lambda: self._queue) - self.queue_manager = MemPoolQueueManager(address=('', 9091), authkey=b'abracadabra') - - def start(self): - self.debug("Starting queue server") - mempool_server = self.queue_manager.get_server() - mempool_server.serve_forever() - - - # def on_eth_send_raw_transaction(self, trx): - # self.debug(f"Got raw transaction to send: {trx}") - # - # def do_extras(self): - # self.debug("Do extras") diff --git a/proxy/mempool/__init__.py b/proxy/mempool/__init__.py new file mode 100644 index 000000000..44cec9438 --- /dev/null +++ b/proxy/mempool/__init__.py @@ -0,0 +1,3 @@ +# from .mempool_service import MemPoolService +# from .mempool_client import MemPoolClient +# from .mem_pool import MemPool diff --git a/proxy/mempool/mem_pool.py b/proxy/mempool/mem_pool.py new file mode 100644 index 000000000..0229a9e13 --- /dev/null +++ b/proxy/mempool/mem_pool.py @@ -0,0 +1,34 @@ +from logged_groups import logged_group +from multiprocessing import Pool + + +@logged_group("neon.Proxy") +class MemPool: + + POOL_PROC_COUNT = 4 + + def __init__(self): + self._pool = Pool(processes=self.POOL_PROC_COUNT) + + def on_eth_send_raw_transaction(self, *, eth_trx_hash): + self._pool.apply_async(self._on_eth_send_raw_transaction_impl, (eth_trx_hash,), {}, self.on_sending_trx_proceed, self.error_callback) + + def error_callback(self, error): + self.error("Failed to invoke the function on worker process: ", error) + + def on_sending_trx_proceed(self, result): + self.debug(f"Sending transaction proceed: {result}") + + def _on_eth_send_raw_transaction_impl(self, eth_trx_hash): + self.debug(f"Transaction is being processed on the worker: {eth_trx_hash}") + + def do_extras(self): + pass + + def __getstate__(self): + self_dict = self.__dict__.copy() + del self_dict['_pool'] + return self_dict + + def __setstate__(self, state): + self.__dict__.update(state) diff --git a/proxy/mempool/mempool_client.py b/proxy/mempool/mempool_client.py new file mode 100644 index 000000000..6f5391a1c --- /dev/null +++ b/proxy/mempool/mempool_client.py @@ -0,0 +1,16 @@ +from proxy.common_neon.utils import QueueBasedServiceClient +from logged_groups import logged_group +from .mempool_service import MemPoolService + + +@logged_group("neon") +class MemPoolClient(QueueBasedServiceClient): + + MEM_POOL_SERVICE_HOST = "127.0.0.1" + + def __init__(self): + self.info("Construct MemPoolClient") + QueueBasedServiceClient.__init__(self, "localhost", 9091) + + def on_eth_send_raw_transaction(self, eth_trx_signature): + self.invoke("on_eth_send_raw_transaction", eth_trx_hash=eth_trx_signature) diff --git a/proxy/mempool/mempool_service.py b/proxy/mempool/mempool_service.py new file mode 100644 index 000000000..ad2d5934a --- /dev/null +++ b/proxy/mempool/mempool_service.py @@ -0,0 +1,25 @@ +from logged_groups import logged_group + +from ..common_neon.utils import QueueBasedService + +from .mem_pool import MemPool + + +@logged_group("neon") +class MemPoolService(QueueBasedService): + + MEM_POOL_SERVICE_PORT = 9091 + + def __init__(self): + QueueBasedService.__init__(self, self.MEM_POOL_SERVICE_PORT) + self._mem_pool = MemPool() + + def on_eth_send_raw_transaction(self, *, eth_trx_hash): + self._mem_pool.on_eth_send_raw_transaction(eth_trx_hash=eth_trx_hash) + + def do_extras(self): + self._mem_pool.do_extras() + + def __del__(self): + self.info("Delete Mempool service") + diff --git a/proxy/neon_proxy_app.py b/proxy/neon_proxy_app.py index f8764000d..3e296cbbe 100644 --- a/proxy/neon_proxy_app.py +++ b/proxy/neon_proxy_app.py @@ -1,18 +1,19 @@ -# from .proxy import entry_point -from .mempool.MemPollService import MemPoolService -# from .statistics_exporter.prometheus_proxy_server import PrometheusProxyServer +from .proxy import entry_point +from .mempool.mempool_service import MemPoolService +from .statistics_exporter.prometheus_proxy_server import PrometheusProxyServer +import multiprocessing as mp class NeonProxyApp: def __init__(self): - self._mem_pool_service = MemPoolService() + self._mem_pool_process = mp.Process(target=NeonProxyApp.run_mempool_service) - def start(self): - # PrometheusProxyServer() - self._mem_pool_service.start() - # entry_point() + @staticmethod + def run_mempool_service(): + MemPoolService().start() - def __del__(self): - self._mem_pool_service.finish() - self._mem_pool_service.join(timeout=5) + def start(self): + PrometheusProxyServer() + self._mem_pool_process.start() + entry_point() diff --git a/proxy/plugin/solana_rest_api.py b/proxy/plugin/solana_rest_api.py index 8d6fb3523..91e7ec2da 100644 --- a/proxy/plugin/solana_rest_api.py +++ b/proxy/plugin/solana_rest_api.py @@ -17,14 +17,14 @@ import sha3 from logged_groups import logged_group, logging_context -from typing import Optional, Union +from typing import Union from ..common.utils import build_http_response from ..http.codes import httpStatusCodes from ..http.parser import HttpParser from ..http.websocket import WebsocketFrame from ..http.server import HttpWebServerBasePlugin, httpProtocolTypes -from typing import Dict, List, Tuple, Optional +from typing import List, Tuple, Optional from ..common_neon.transaction_sender import NeonTxSender from ..common_neon.solana_interactor import SolanaInteractor @@ -35,10 +35,10 @@ from ..common_neon.estimate import GasEstimate from ..common_neon.utils import SolanaBlockInfo from ..common_neon.keys_storage import KeyStorage +from ..mempool.mempool_client import MemPoolClient from ..environment import SOLANA_URL, PP_SOLANA_URL, PYTH_MAPPING_ACCOUNT, EVM_STEP_COUNT, CHAIN_ID, ENABLE_PRIVATE_API from ..environment import NEON_EVM_VERSION, NEON_EVM_REVISION from ..environment import neon_cli -from ..environment import get_solana_accounts from ..memdb.memdb import MemDB from .gas_price_calculator import GasPriceCalculator from ..common_neon.eth_proto import Trx as EthTrx @@ -61,6 +61,7 @@ class EthereumModel: def __init__(self): self._solana = SolanaInteractor(SOLANA_URL) self._db = MemDB(self._solana) + self._mempool_client = MemPoolClient() if PP_SOLANA_URL == SOLANA_URL: self.gas_price_calculator = GasPriceCalculator(self._solana, PYTH_MAPPING_ACCOUNT) @@ -454,9 +455,11 @@ def eth_sendRawTransaction(self, rawTrx: str) -> str: self._stat_tx_begin() try: + #TODO: move it into MemPoolService tx_sender = NeonTxSender(self._db, self._solana, trx, steps=EVM_STEP_COUNT) tx_sender.execute() self._stat_tx_success() + self._mempool_client.on_eth_send_raw_transaction(eth_signature) return eth_signature except PendingTxError as err: From 1a70b77059c835822ea73290adf89eb48b0b944f Mon Sep 17 00:00:00 2001 From: rozhkovdmitrii Date: Sun, 10 Apr 2022 19:33:01 +0400 Subject: [PATCH 03/15] Handle sigint on QueueBasedService --- proxy/common_neon/utils/__init__.py | 3 ++ .../common_neon/utils/queue_based_service.py | 36 ++++++++++++------- 2 files changed, 26 insertions(+), 13 deletions(-) create mode 100644 proxy/common_neon/utils/__init__.py diff --git a/proxy/common_neon/utils/__init__.py b/proxy/common_neon/utils/__init__.py new file mode 100644 index 000000000..d45ecaed9 --- /dev/null +++ b/proxy/common_neon/utils/__init__.py @@ -0,0 +1,3 @@ +from .queue_based_service import QueueBasedService, QueueBasedServiceClient, ServiceInvocation +from .utils import * + diff --git a/proxy/common_neon/utils/queue_based_service.py b/proxy/common_neon/utils/queue_based_service.py index b2550ef2b..7c3091211 100644 --- a/proxy/common_neon/utils/queue_based_service.py +++ b/proxy/common_neon/utils/queue_based_service.py @@ -1,5 +1,8 @@ import abc import multiprocessing as mp +import os +import signal + from multiprocessing.managers import BaseManager from dataclasses import dataclass, astuple, field from typing import Tuple, Dict, Any @@ -38,9 +41,10 @@ class QueueBasedService(abc.ABC): BREAK_PROC_INVOCATION = 0 JOIN_PROC_TIMEOUT_SEC = 5 - def __init__(self, port: int): + def __init__(self, port: int, is_main_proc: bool): self._queue = mp.Queue() self._port = port + self._is_main_proc = is_main_proc class MemPoolQueueManager(BaseManager): pass @@ -48,22 +52,28 @@ class MemPoolQueueManager(BaseManager): MemPoolQueueManager.register("get_queue", callable=lambda: self._queue) self._queue_manager = MemPoolQueueManager(address=('', port), authkey=b'abracadabra') self._mempool_server = self._queue_manager.get_server() - self._mempool_server_process = mp.Process(target=self._mempool_server.serve_forever) + self._mempool_server_process = mp.Process(target=self._mempool_server.serve_forever, name="mempool_service") + self._queue_process = mp.Process(target=self.run, name="mempool_queue_proc") self._timeout = self.QUEUE_TIMEOUT_SEC + pid = os.getpid() + signal.signal(signal.SIGINT, lambda sif, frame: self.finish() if os.getpid() == pid else 0) + def start(self): self.info(f"Starting queue server: {self._port}") self._mempool_server_process.start() - self.run() + self._queue_process.start() + if self._is_main_proc: + self._queue_process.join() def run(self): + self.service_process_init() while True: try: if not self._run_impl(): break except BaseException as e: self.do_extras() - self.info("Processing has been finished") def _run_impl(self) -> bool: @@ -81,17 +91,17 @@ def dispatch(self, invocation: ServiceInvocation): handler(*args, **kwargs) def finish(self): - self._queue.put(self.BREAK_PROC_INVOCATION) - # self._mempool_queue_process.join(timeout=self.JOIN_PROC_TIMEOUT_SEC) - # self._mempool_service_process.join(timeout=self.JOIN_PROC_TIMEOUT_SEC) + self.info("Finishing the queue and listening processes") + self._mempool_server_process.terminate() + if not self._queue_process.is_alive(): + return + self._queue.put_nowait(self.BREAK_PROC_INVOCATION) + self._queue_process.join(timeout=self.JOIN_PROC_TIMEOUT_SEC) @abc.abstractmethod def do_extras(self): assert "To be implemented in derived class" - def wait(self): - self._mempool_queue_process.join() - self._mempool_service_process.join() - - def __del__(self): - self.finish() + @abc.abstractmethod + def service_process_init(self): + assert "To be implemented in derived class" From 1903f4412c02c2857e3231244e020aa92ba526d8 Mon Sep 17 00:00:00 2001 From: rozhkovdmitrii Date: Sun, 10 Apr 2022 19:33:57 +0400 Subject: [PATCH 04/15] Spit and polish --- proxy/mempool/mem_pool.py | 11 ++++++----- proxy/mempool/mempool_service.py | 13 +++++++------ 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/proxy/mempool/mem_pool.py b/proxy/mempool/mem_pool.py index 0229a9e13..232466654 100644 --- a/proxy/mempool/mem_pool.py +++ b/proxy/mempool/mem_pool.py @@ -5,19 +5,20 @@ @logged_group("neon.Proxy") class MemPool: - POOL_PROC_COUNT = 4 + POOL_PROC_COUNT = 20 def __init__(self): self._pool = Pool(processes=self.POOL_PROC_COUNT) def on_eth_send_raw_transaction(self, *, eth_trx_hash): - self._pool.apply_async(self._on_eth_send_raw_transaction_impl, (eth_trx_hash,), {}, self.on_sending_trx_proceed, self.error_callback) + self._pool.apply_async(func=self._on_eth_send_raw_transaction_impl, args=(eth_trx_hash,), + callback=self.on_eth_send_raw_transaction_callback, error_callback=self.error_callback) def error_callback(self, error): - self.error("Failed to invoke the function on worker process: ", error) + self.error("Failed to invoke on worker process: ", error) - def on_sending_trx_proceed(self, result): - self.debug(f"Sending transaction proceed: {result}") + def on_eth_send_raw_transaction_callback(self, result): + pass def _on_eth_send_raw_transaction_impl(self, eth_trx_hash): self.debug(f"Transaction is being processed on the worker: {eth_trx_hash}") diff --git a/proxy/mempool/mempool_service.py b/proxy/mempool/mempool_service.py index ad2d5934a..d7dc64ea1 100644 --- a/proxy/mempool/mempool_service.py +++ b/proxy/mempool/mempool_service.py @@ -11,15 +11,16 @@ class MemPoolService(QueueBasedService): MEM_POOL_SERVICE_PORT = 9091 def __init__(self): - QueueBasedService.__init__(self, self.MEM_POOL_SERVICE_PORT) - self._mem_pool = MemPool() + QueueBasedService.__init__(self, self.MEM_POOL_SERVICE_PORT, True) + self._mem_pool = None def on_eth_send_raw_transaction(self, *, eth_trx_hash): self._mem_pool.on_eth_send_raw_transaction(eth_trx_hash=eth_trx_hash) - def do_extras(self): - self._mem_pool.do_extras() + # QueueBasedService abstracts - def __del__(self): - self.info("Delete Mempool service") + def service_process_init(self): + self._mem_pool = MemPool() + def do_extras(self): + self._mem_pool.do_extras() From 9bb0ccfae9a1d8b6d3f048ab3558ebfea67e7b58 Mon Sep 17 00:00:00 2001 From: rozhkovdmitrii Date: Sun, 10 Apr 2022 19:35:19 +0400 Subject: [PATCH 05/15] Remove mem_pool_service.py from git --- mem_pool_service.py | 8 -------- 1 file changed, 8 deletions(-) delete mode 100644 mem_pool_service.py diff --git a/mem_pool_service.py b/mem_pool_service.py deleted file mode 100644 index 633a248cf..000000000 --- a/mem_pool_service.py +++ /dev/null @@ -1,8 +0,0 @@ -from proxy.mempool.mempool_service import MemPoolService - -mempool_service = MemPoolService() - -mempool_service.start() -mempool_service.wait() - - From 6058114511116c81f32b5509dd7675c0a32aad81 Mon Sep 17 00:00:00 2001 From: rozhkovdmitrii Date: Sun, 10 Apr 2022 19:38:34 +0400 Subject: [PATCH 06/15] Get rif of useless TestQueueServer.py --- TestQueueServer.py | 12 ------------ 1 file changed, 12 deletions(-) delete mode 100644 TestQueueServer.py diff --git a/TestQueueServer.py b/TestQueueServer.py deleted file mode 100644 index 3931617f3..000000000 --- a/TestQueueServer.py +++ /dev/null @@ -1,12 +0,0 @@ -from multiprocessing.managers import BaseManager -from queue import Queue -queue = Queue() - -class QueueManager(BaseManager): - pass - -QueueManager.register('get_queue', callable=lambda:queue) - -m = QueueManager(address=('', 9091), authkey=b'abracadabra') -s = m.get_server() -s.serve_forever() From 386db34f15efd7440f590908df2000e9fd5ba0a3 Mon Sep 17 00:00:00 2001 From: rozhkovdmitrii Date: Sun, 10 Apr 2022 19:42:42 +0400 Subject: [PATCH 07/15] Get rif of useless base_process.py --- proxy/common_neon/base_process.py | 61 ------------------------------- 1 file changed, 61 deletions(-) delete mode 100644 proxy/common_neon/base_process.py diff --git a/proxy/common_neon/base_process.py b/proxy/common_neon/base_process.py deleted file mode 100644 index 187eb2f07..000000000 --- a/proxy/common_neon/base_process.py +++ /dev/null @@ -1,61 +0,0 @@ -import multiprocessing as mp -import queue -from dataclasses import dataclass, astuple -from typing import Tuple, Dict, Any -from abc import ABC, abstractmethod - - -@dataclass -class Invocation: - handler: str - args: Tuple[Any] - kvargs: Dict[str, Any] - - -class BaseProcess(mp.Process): - """A process backed by an internal queue for simple one-way message passing""" - - QUEUE_TIMEOUT_SEC = 1 - BREAK_PROC_INVOCATION = 0 - - def __init__(self, *args, **kwargs): - self.__queue = mp.Queue - super().__init__(*args, **kwargs) - self._queue = mp.Queue() - self._timeout = BaseProcess.QUEUE_TIMEOUT_SEC - - def send(self, handler, *args, **kvargs): - """Puts the event and args as a `Msg` on the queue""" - invocation = Invocation(handler=handler, args=args, kvargs=kvargs) - self._queue.put(invocation) - - def finish(self): - self._queue.put(BaseProcess.BREAK_PROC_INVOCATION) - - def dispatch(self, invocation: Invocation): - handler, args, kvargs = astuple(invocation) - - handler = getattr(self, handler, None) - if handler is None: - raise NotImplementedError(f"Process has no handler for {handler}") - - handler(*args, **kvargs) - - def run(self): - while True: - try: - if not self._run_impl(): - break - except queue.Empty: - self.do_extras() - - def _run_impl(self) -> bool: - invocation = self._queue.get(block=True, timeout=self._timeout) - if invocation == BaseProcess.BREAK_PROC_INVOCATION: - return False - self.dispatch(invocation) - return True - - @abstractmethod - def do_extras(self): - assert "Should be implemented" From a3927982238423d1538d8d3ca0c49be269d95cf6 Mon Sep 17 00:00:00 2001 From: rozhkovdmitrii Date: Sun, 10 Apr 2022 19:47:25 +0400 Subject: [PATCH 08/15] Spit and polish --- proxy/common_neon/utils/utils.py | 2 +- proxy/mempool/mempool_client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/proxy/common_neon/utils/utils.py b/proxy/common_neon/utils/utils.py index d25960285..db837cbab 100644 --- a/proxy/common_neon/utils/utils.py +++ b/proxy/common_neon/utils/utils.py @@ -9,7 +9,7 @@ #TODO: move it out from here from ...environment import EVM_LOADER_ID -from ...common_neon.eth_proto import Trx as EthTx +from ..eth_proto import Trx as EthTx def str_fmt_object(obj): diff --git a/proxy/mempool/mempool_client.py b/proxy/mempool/mempool_client.py index 6f5391a1c..b9c8e3372 100644 --- a/proxy/mempool/mempool_client.py +++ b/proxy/mempool/mempool_client.py @@ -10,7 +10,7 @@ class MemPoolClient(QueueBasedServiceClient): def __init__(self): self.info("Construct MemPoolClient") - QueueBasedServiceClient.__init__(self, "localhost", 9091) + QueueBasedServiceClient.__init__(self, self.MEM_POOL_SERVICE_HOST, MemPoolService.MEM_POOL_SERVICE_PORT) def on_eth_send_raw_transaction(self, eth_trx_signature): self.invoke("on_eth_send_raw_transaction", eth_trx_hash=eth_trx_signature) From 84b9c23f867adb4a4ba538efc416578da23c3480 Mon Sep 17 00:00:00 2001 From: rozhkovdmitrii Date: Sun, 10 Apr 2022 19:56:59 +0400 Subject: [PATCH 09/15] Rework is_background key property --- proxy/common_neon/utils/queue_based_service.py | 6 +++--- proxy/mempool/mempool_service.py | 4 ++-- proxy/neon_proxy_app.py | 9 ++------- 3 files changed, 7 insertions(+), 12 deletions(-) diff --git a/proxy/common_neon/utils/queue_based_service.py b/proxy/common_neon/utils/queue_based_service.py index 7c3091211..44366688c 100644 --- a/proxy/common_neon/utils/queue_based_service.py +++ b/proxy/common_neon/utils/queue_based_service.py @@ -41,10 +41,10 @@ class QueueBasedService(abc.ABC): BREAK_PROC_INVOCATION = 0 JOIN_PROC_TIMEOUT_SEC = 5 - def __init__(self, port: int, is_main_proc: bool): + def __init__(self, *, port: int, is_background: bool): self._queue = mp.Queue() self._port = port - self._is_main_proc = is_main_proc + self._is_back_ground = is_background class MemPoolQueueManager(BaseManager): pass @@ -63,7 +63,7 @@ def start(self): self.info(f"Starting queue server: {self._port}") self._mempool_server_process.start() self._queue_process.start() - if self._is_main_proc: + if not self._is_back_ground: self._queue_process.join() def run(self): diff --git a/proxy/mempool/mempool_service.py b/proxy/mempool/mempool_service.py index d7dc64ea1..42adef4c4 100644 --- a/proxy/mempool/mempool_service.py +++ b/proxy/mempool/mempool_service.py @@ -10,8 +10,8 @@ class MemPoolService(QueueBasedService): MEM_POOL_SERVICE_PORT = 9091 - def __init__(self): - QueueBasedService.__init__(self, self.MEM_POOL_SERVICE_PORT, True) + def __init__(self, *, is_background: bool): + QueueBasedService.__init__(self, port=self.MEM_POOL_SERVICE_PORT, is_background=is_background) self._mem_pool = None def on_eth_send_raw_transaction(self, *, eth_trx_hash): diff --git a/proxy/neon_proxy_app.py b/proxy/neon_proxy_app.py index 3e296cbbe..c7a1c636e 100644 --- a/proxy/neon_proxy_app.py +++ b/proxy/neon_proxy_app.py @@ -1,19 +1,14 @@ from .proxy import entry_point from .mempool.mempool_service import MemPoolService from .statistics_exporter.prometheus_proxy_server import PrometheusProxyServer -import multiprocessing as mp class NeonProxyApp: def __init__(self): - self._mem_pool_process = mp.Process(target=NeonProxyApp.run_mempool_service) - - @staticmethod - def run_mempool_service(): - MemPoolService().start() + self._mempool_service = MemPoolService(is_background=True) def start(self): PrometheusProxyServer() - self._mem_pool_process.start() + self._mempool_service.start() entry_point() From 9c46badaf7cc4147522e4b332dba772694186b9e Mon Sep 17 00:00:00 2001 From: rozhkovdmitrii Date: Sun, 10 Apr 2022 19:58:59 +0400 Subject: [PATCH 10/15] Set queue timeout sec --- proxy/common_neon/utils/queue_based_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/common_neon/utils/queue_based_service.py b/proxy/common_neon/utils/queue_based_service.py index 44366688c..df5806336 100644 --- a/proxy/common_neon/utils/queue_based_service.py +++ b/proxy/common_neon/utils/queue_based_service.py @@ -37,7 +37,7 @@ def invoke(self, method_name, *args, **kwargs): @logged_group("neon") class QueueBasedService(abc.ABC): - QUEUE_TIMEOUT_SEC = 10 + QUEUE_TIMEOUT_SEC = 0.4 BREAK_PROC_INVOCATION = 0 JOIN_PROC_TIMEOUT_SEC = 5 From c851c554ccbb9b7ef3688e4d1870e7acc5238a7d Mon Sep 17 00:00:00 2001 From: rozhkovdmitrii Date: Sun, 10 Apr 2022 20:01:08 +0400 Subject: [PATCH 11/15] Spit and polish --- proxy/mempool/__init__.py | 6 +++--- proxy/mempool/mempool_client.py | 2 +- proxy/plugin/solana_rest_api.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/proxy/mempool/__init__.py b/proxy/mempool/__init__.py index 44cec9438..adc9b5cae 100644 --- a/proxy/mempool/__init__.py +++ b/proxy/mempool/__init__.py @@ -1,3 +1,3 @@ -# from .mempool_service import MemPoolService -# from .mempool_client import MemPoolClient -# from .mem_pool import MemPool +from .mempool_service import MemPoolService +from .mempool_client import MemPoolClient +from .mem_pool import MemPool diff --git a/proxy/mempool/mempool_client.py b/proxy/mempool/mempool_client.py index b9c8e3372..528868280 100644 --- a/proxy/mempool/mempool_client.py +++ b/proxy/mempool/mempool_client.py @@ -1,6 +1,6 @@ from proxy.common_neon.utils import QueueBasedServiceClient from logged_groups import logged_group -from .mempool_service import MemPoolService +from . import MemPoolService @logged_group("neon") diff --git a/proxy/plugin/solana_rest_api.py b/proxy/plugin/solana_rest_api.py index 91e7ec2da..6f7e6b879 100644 --- a/proxy/plugin/solana_rest_api.py +++ b/proxy/plugin/solana_rest_api.py @@ -35,7 +35,7 @@ from ..common_neon.estimate import GasEstimate from ..common_neon.utils import SolanaBlockInfo from ..common_neon.keys_storage import KeyStorage -from ..mempool.mempool_client import MemPoolClient +from ..mempool import MemPoolClient from ..environment import SOLANA_URL, PP_SOLANA_URL, PYTH_MAPPING_ACCOUNT, EVM_STEP_COUNT, CHAIN_ID, ENABLE_PRIVATE_API from ..environment import NEON_EVM_VERSION, NEON_EVM_REVISION from ..environment import neon_cli From 60d2930eecb23c1bc4d323b05bb19d20ff9ac6dd Mon Sep 17 00:00:00 2001 From: rozhkovdmitrii Date: Sun, 10 Apr 2022 20:25:53 +0400 Subject: [PATCH 12/15] Spit and polish --- proxy/common_neon/__init__.py | 1 + proxy/common_neon/types.py | 9 +++++++ .../common_neon/utils/queue_based_service.py | 24 +++++++++++++------ proxy/mempool/mempool_client.py | 9 ++++--- proxy/plugin/solana_rest_api.py | 1 - 5 files changed, 33 insertions(+), 11 deletions(-) create mode 100644 proxy/common_neon/types.py diff --git a/proxy/common_neon/__init__.py b/proxy/common_neon/__init__.py index e69de29bb..ca2b699e8 100644 --- a/proxy/common_neon/__init__.py +++ b/proxy/common_neon/__init__.py @@ -0,0 +1 @@ +from .types import Result diff --git a/proxy/common_neon/types.py b/proxy/common_neon/types.py new file mode 100644 index 000000000..433444a30 --- /dev/null +++ b/proxy/common_neon/types.py @@ -0,0 +1,9 @@ +class Result: + def __init__(self, reason: str = None): + self._reason = reason + + def __bool__(self) -> bool: + return self._reason is None + + def __str__(self) -> str: + return self._reason if self._reason is not None else "" diff --git a/proxy/common_neon/utils/queue_based_service.py b/proxy/common_neon/utils/queue_based_service.py index df5806336..2afb45613 100644 --- a/proxy/common_neon/utils/queue_based_service.py +++ b/proxy/common_neon/utils/queue_based_service.py @@ -1,6 +1,7 @@ import abc import multiprocessing as mp import os +import queue import signal from multiprocessing.managers import BaseManager @@ -9,6 +10,8 @@ from logged_groups import logged_group +from ..types import Result + @dataclass class ServiceInvocation: @@ -17,6 +20,7 @@ class ServiceInvocation: kwargs: Dict[str, Any] = field(default_factory=dict) +@logged_group("neon") class QueueBasedServiceClient: def __init__(self, host: str, port: int): @@ -28,9 +32,16 @@ class MemPoolQueueManager(BaseManager): queue_manager.connect() self._queue = queue_manager.get_queue() - def invoke(self, method_name, *args, **kwargs): - invocation = ServiceInvocation(method_name=method_name, args=args, kwargs=kwargs) + def invoke(self, method_name, *args, **kwargs) -> Result: + try: + self._invoke_impl(method_name, *args, **kwargs) + except queue.Full: + self.error(f"Failed to invoke the method: {method_name}, queue is full") + return Result("Mempool queue full") + return Result() + def _invoke_impl(self, method_name, *args, **kwargs): + invocation = ServiceInvocation(method_name=method_name, args=args, kwargs=kwargs) self._queue.put(invocation) @@ -42,19 +53,19 @@ class QueueBasedService(abc.ABC): JOIN_PROC_TIMEOUT_SEC = 5 def __init__(self, *, port: int, is_background: bool): - self._queue = mp.Queue() self._port = port self._is_back_ground = is_background + self._timeout = self.QUEUE_TIMEOUT_SEC class MemPoolQueueManager(BaseManager): pass + self._queue = mp.Queue() MemPoolQueueManager.register("get_queue", callable=lambda: self._queue) self._queue_manager = MemPoolQueueManager(address=('', port), authkey=b'abracadabra') self._mempool_server = self._queue_manager.get_server() - self._mempool_server_process = mp.Process(target=self._mempool_server.serve_forever, name="mempool_service") + self._mempool_server_process = mp.Process(target=self._mempool_server.serve_forever, name="mempool_listen_proc") self._queue_process = mp.Process(target=self.run, name="mempool_queue_proc") - self._timeout = self.QUEUE_TIMEOUT_SEC pid = os.getpid() signal.signal(signal.SIGINT, lambda sif, frame: self.finish() if os.getpid() == pid else 0) @@ -72,11 +83,10 @@ def run(self): try: if not self._run_impl(): break - except BaseException as e: + except queue.Empty: self.do_extras() def _run_impl(self) -> bool: - invocation = self._queue.get(block=True, timeout=self._timeout) if invocation == self.BREAK_PROC_INVOCATION: return False diff --git a/proxy/mempool/mempool_client.py b/proxy/mempool/mempool_client.py index 528868280..a919be3ff 100644 --- a/proxy/mempool/mempool_client.py +++ b/proxy/mempool/mempool_client.py @@ -1,5 +1,8 @@ -from proxy.common_neon.utils import QueueBasedServiceClient from logged_groups import logged_group + +from ..common_neon.utils import QueueBasedServiceClient +from ..common_neon import Result + from . import MemPoolService @@ -12,5 +15,5 @@ def __init__(self): self.info("Construct MemPoolClient") QueueBasedServiceClient.__init__(self, self.MEM_POOL_SERVICE_HOST, MemPoolService.MEM_POOL_SERVICE_PORT) - def on_eth_send_raw_transaction(self, eth_trx_signature): - self.invoke("on_eth_send_raw_transaction", eth_trx_hash=eth_trx_signature) + def on_eth_send_raw_transaction(self, eth_trx_signature) -> Result: + return self.invoke("on_eth_send_raw_transaction", eth_trx_hash=eth_trx_signature) diff --git a/proxy/plugin/solana_rest_api.py b/proxy/plugin/solana_rest_api.py index 6f7e6b879..924eeaba7 100644 --- a/proxy/plugin/solana_rest_api.py +++ b/proxy/plugin/solana_rest_api.py @@ -455,7 +455,6 @@ def eth_sendRawTransaction(self, rawTrx: str) -> str: self._stat_tx_begin() try: - #TODO: move it into MemPoolService tx_sender = NeonTxSender(self._db, self._solana, trx, steps=EVM_STEP_COUNT) tx_sender.execute() self._stat_tx_success() From b3389933c5676260ec2cb59441161dd12e632146 Mon Sep 17 00:00:00 2001 From: rozhkovdmitrii Date: Sun, 10 Apr 2022 20:29:09 +0400 Subject: [PATCH 13/15] Spit and polish --- proxy/common_neon/utils/__init__.py | 2 +- proxy/mempool/mempool_client.py | 5 +++-- proxy/mempool/mempool_service.py | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/proxy/common_neon/utils/__init__.py b/proxy/common_neon/utils/__init__.py index d45ecaed9..818a2dea3 100644 --- a/proxy/common_neon/utils/__init__.py +++ b/proxy/common_neon/utils/__init__.py @@ -1,3 +1,3 @@ from .queue_based_service import QueueBasedService, QueueBasedServiceClient, ServiceInvocation -from .utils import * +# from .utils import * diff --git a/proxy/mempool/mempool_client.py b/proxy/mempool/mempool_client.py index a919be3ff..6cd8560af 100644 --- a/proxy/mempool/mempool_client.py +++ b/proxy/mempool/mempool_client.py @@ -12,8 +12,9 @@ class MemPoolClient(QueueBasedServiceClient): MEM_POOL_SERVICE_HOST = "127.0.0.1" def __init__(self): - self.info("Construct MemPoolClient") - QueueBasedServiceClient.__init__(self, self.MEM_POOL_SERVICE_HOST, MemPoolService.MEM_POOL_SERVICE_PORT) + port, host = (self.MEM_POOL_SERVICE_PORT, self.MEM_POOL_SERVICE_HOST) + self.info(f"Initialize MemPoolClient connecting to: {port} at: {host}") + QueueBasedServiceClient.__init__(self, host, port) def on_eth_send_raw_transaction(self, eth_trx_signature) -> Result: return self.invoke("on_eth_send_raw_transaction", eth_trx_hash=eth_trx_signature) diff --git a/proxy/mempool/mempool_service.py b/proxy/mempool/mempool_service.py index 42adef4c4..dd7590a34 100644 --- a/proxy/mempool/mempool_service.py +++ b/proxy/mempool/mempool_service.py @@ -2,7 +2,7 @@ from ..common_neon.utils import QueueBasedService -from .mem_pool import MemPool +from . import MemPool @logged_group("neon") From 01f86a5bf9e597ad8c30b9f74732e5c43d335af7 Mon Sep 17 00:00:00 2001 From: rozhkovdmitrii Date: Sun, 10 Apr 2022 20:31:27 +0400 Subject: [PATCH 14/15] Spit and polish --- proxy/common_neon/utils/__init__.py | 2 +- proxy/mempool/mempool_client.py | 2 +- proxy/mempool/mempool_service.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/proxy/common_neon/utils/__init__.py b/proxy/common_neon/utils/__init__.py index 818a2dea3..d45ecaed9 100644 --- a/proxy/common_neon/utils/__init__.py +++ b/proxy/common_neon/utils/__init__.py @@ -1,3 +1,3 @@ from .queue_based_service import QueueBasedService, QueueBasedServiceClient, ServiceInvocation -# from .utils import * +from .utils import * diff --git a/proxy/mempool/mempool_client.py b/proxy/mempool/mempool_client.py index 6cd8560af..154ff8455 100644 --- a/proxy/mempool/mempool_client.py +++ b/proxy/mempool/mempool_client.py @@ -12,7 +12,7 @@ class MemPoolClient(QueueBasedServiceClient): MEM_POOL_SERVICE_HOST = "127.0.0.1" def __init__(self): - port, host = (self.MEM_POOL_SERVICE_PORT, self.MEM_POOL_SERVICE_HOST) + port, host = (MemPoolService.MEM_POOL_SERVICE_PORT, self.MEM_POOL_SERVICE_HOST) self.info(f"Initialize MemPoolClient connecting to: {port} at: {host}") QueueBasedServiceClient.__init__(self, host, port) diff --git a/proxy/mempool/mempool_service.py b/proxy/mempool/mempool_service.py index dd7590a34..42adef4c4 100644 --- a/proxy/mempool/mempool_service.py +++ b/proxy/mempool/mempool_service.py @@ -2,7 +2,7 @@ from ..common_neon.utils import QueueBasedService -from . import MemPool +from .mem_pool import MemPool @logged_group("neon") From 3a695d5e3d622e9eb5f03d26194da02685b49890 Mon Sep 17 00:00:00 2001 From: rozhkovdmitrii Date: Mon, 11 Apr 2022 18:35:15 +0400 Subject: [PATCH 15/15] Reset logging name for MemPool components --- proxy/mempool/mem_pool.py | 2 +- proxy/mempool/mempool_client.py | 2 +- proxy/mempool/mempool_service.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/proxy/mempool/mem_pool.py b/proxy/mempool/mem_pool.py index 232466654..774ac02f7 100644 --- a/proxy/mempool/mem_pool.py +++ b/proxy/mempool/mem_pool.py @@ -2,7 +2,7 @@ from multiprocessing import Pool -@logged_group("neon.Proxy") +@logged_group("neon.MemPool") class MemPool: POOL_PROC_COUNT = 20 diff --git a/proxy/mempool/mempool_client.py b/proxy/mempool/mempool_client.py index 154ff8455..958b7ef09 100644 --- a/proxy/mempool/mempool_client.py +++ b/proxy/mempool/mempool_client.py @@ -6,7 +6,7 @@ from . import MemPoolService -@logged_group("neon") +@logged_group("neon.Proxy") class MemPoolClient(QueueBasedServiceClient): MEM_POOL_SERVICE_HOST = "127.0.0.1" diff --git a/proxy/mempool/mempool_service.py b/proxy/mempool/mempool_service.py index 42adef4c4..75c28d697 100644 --- a/proxy/mempool/mempool_service.py +++ b/proxy/mempool/mempool_service.py @@ -5,7 +5,7 @@ from .mem_pool import MemPool -@logged_group("neon") +@logged_group("neon.MemPool") class MemPoolService(QueueBasedService): MEM_POOL_SERVICE_PORT = 9091