Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Stable version of Mempool service
  • Loading branch information
rozhkovdmitrii committed Apr 10, 2022
commit 97190e9d665f04e9138f0f74dee6383a1207a715
8 changes: 8 additions & 0 deletions mem_pool_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from proxy.mempool.mempool_service import MemPoolService

mempool_service = MemPoolService()

mempool_service.start()
mempool_service.wait()


27 changes: 14 additions & 13 deletions proxy/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,23 @@


import os
# from .indexer.indexer import run_indexer
from .indexer.indexer import run_indexer
from .neon_proxy_app import NeonProxyApp

if __name__ == '__main__':
# solana_url = os.environ['SOLANA_URL']
# evm_loader_id = os.environ['EVM_LOADER']
# print(f"Will run with SOLANA_URL={solana_url}; EVM_LOADER={evm_loader_id}")
#
# indexer_mode = os.environ.get('INDEXER_MODE', 'False').lower() in [1, 'true', 'True']
#
# if indexer_mode:
# print("Will run in indexer mode")
# run_indexer(solana_url)
# else:
neon_proxy_app = NeonProxyApp()
neon_proxy_app.start()
solana_url = os.environ['SOLANA_URL']
evm_loader_id = os.environ['EVM_LOADER']
print(f"Will run with SOLANA_URL={solana_url}; EVM_LOADER={evm_loader_id}")

indexer_mode = os.environ.get('INDEXER_MODE', 'False').lower() in [1, 'true', 'True']

if indexer_mode:
print("Will run in indexer mode")
run_indexer(solana_url)
else:
neon_proxy_app = NeonProxyApp()
neon_proxy_app.start()




97 changes: 97 additions & 0 deletions proxy/common_neon/utils/queue_based_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import abc
import multiprocessing as mp
from multiprocessing.managers import BaseManager
from dataclasses import dataclass, astuple, field
from typing import Tuple, Dict, Any

from logged_groups import logged_group


@dataclass
class ServiceInvocation:
method_name: str = None
args: Tuple[Any] = field(default_factory=tuple)
kwargs: Dict[str, Any] = field(default_factory=dict)


class QueueBasedServiceClient:

def __init__(self, host: str, port: int):
class MemPoolQueueManager(BaseManager):
pass

MemPoolQueueManager.register('get_queue')
queue_manager = MemPoolQueueManager(address=(host, port), authkey=b'abracadabra')
queue_manager.connect()
self._queue = queue_manager.get_queue()

def invoke(self, method_name, *args, **kwargs):
invocation = ServiceInvocation(method_name=method_name, args=args, kwargs=kwargs)

self._queue.put(invocation)


@logged_group("neon")
class QueueBasedService(abc.ABC):

QUEUE_TIMEOUT_SEC = 10
BREAK_PROC_INVOCATION = 0
JOIN_PROC_TIMEOUT_SEC = 5

def __init__(self, port: int):
self._queue = mp.Queue()
self._port = port

class MemPoolQueueManager(BaseManager):
pass

MemPoolQueueManager.register("get_queue", callable=lambda: self._queue)
self._queue_manager = MemPoolQueueManager(address=('', port), authkey=b'abracadabra')
self._mempool_server = self._queue_manager.get_server()
self._mempool_server_process = mp.Process(target=self._mempool_server.serve_forever)
self._timeout = self.QUEUE_TIMEOUT_SEC

def start(self):
self.info(f"Starting queue server: {self._port}")
self._mempool_server_process.start()
self.run()

def run(self):
while True:
try:
if not self._run_impl():
break
except BaseException as e:
self.do_extras()
self.info("Processing has been finished")

def _run_impl(self) -> bool:

invocation = self._queue.get(block=True, timeout=self._timeout)
if invocation == self.BREAK_PROC_INVOCATION:
return False
self.dispatch(invocation)
return True

def dispatch(self, invocation: ServiceInvocation):
method_name, args, kwargs = astuple(invocation)
handler = getattr(self, method_name, None)
if handler is None:
raise NotImplementedError(f"Process has no handler for {handler}")
handler(*args, **kwargs)

def finish(self):
self._queue.put(self.BREAK_PROC_INVOCATION)
# self._mempool_queue_process.join(timeout=self.JOIN_PROC_TIMEOUT_SEC)
# self._mempool_service_process.join(timeout=self.JOIN_PROC_TIMEOUT_SEC)

@abc.abstractmethod
def do_extras(self):
assert "To be implemented in derived class"

def wait(self):
self._mempool_queue_process.join()
self._mempool_service_process.join()

def __del__(self):
self.finish()
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

from eth_utils import big_endian_to_int

from ..environment import EVM_LOADER_ID
#TODO: move it out from here
from ...environment import EVM_LOADER_ID

from ..common_neon.eth_proto import Trx as EthTx
from ...common_neon.eth_proto import Trx as EthTx


def str_fmt_object(obj):
Expand Down
28 changes: 0 additions & 28 deletions proxy/mempool/MemPollService.py

This file was deleted.

3 changes: 3 additions & 0 deletions proxy/mempool/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# from .mempool_service import MemPoolService
# from .mempool_client import MemPoolClient
# from .mem_pool import MemPool
34 changes: 34 additions & 0 deletions proxy/mempool/mem_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from logged_groups import logged_group
from multiprocessing import Pool


@logged_group("neon.Proxy")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think better to add a new logged_group 'neon.Mempool'.
This will show next stages in transaction processing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

class MemPool:

POOL_PROC_COUNT = 4

def __init__(self):
self._pool = Pool(processes=self.POOL_PROC_COUNT)

def on_eth_send_raw_transaction(self, *, eth_trx_hash):
self._pool.apply_async(self._on_eth_send_raw_transaction_impl, (eth_trx_hash,), {}, self.on_sending_trx_proceed, self.error_callback)

def error_callback(self, error):
self.error("Failed to invoke the function on worker process: ", error)

def on_sending_trx_proceed(self, result):
self.debug(f"Sending transaction proceed: {result}")

def _on_eth_send_raw_transaction_impl(self, eth_trx_hash):
self.debug(f"Transaction is being processed on the worker: {eth_trx_hash}")

def do_extras(self):
pass

def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['_pool']
return self_dict

def __setstate__(self, state):
self.__dict__.update(state)
16 changes: 16 additions & 0 deletions proxy/mempool/mempool_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from proxy.common_neon.utils import QueueBasedServiceClient
from logged_groups import logged_group
from .mempool_service import MemPoolService


@logged_group("neon")
class MemPoolClient(QueueBasedServiceClient):

MEM_POOL_SERVICE_HOST = "127.0.0.1"

def __init__(self):
self.info("Construct MemPoolClient")
QueueBasedServiceClient.__init__(self, "localhost", 9091)

def on_eth_send_raw_transaction(self, eth_trx_signature):
self.invoke("on_eth_send_raw_transaction", eth_trx_hash=eth_trx_signature)
25 changes: 25 additions & 0 deletions proxy/mempool/mempool_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from logged_groups import logged_group

from ..common_neon.utils import QueueBasedService

from .mem_pool import MemPool


@logged_group("neon")
class MemPoolService(QueueBasedService):

MEM_POOL_SERVICE_PORT = 9091

def __init__(self):
QueueBasedService.__init__(self, self.MEM_POOL_SERVICE_PORT)
self._mem_pool = MemPool()

def on_eth_send_raw_transaction(self, *, eth_trx_hash):
self._mem_pool.on_eth_send_raw_transaction(eth_trx_hash=eth_trx_hash)

def do_extras(self):
self._mem_pool.do_extras()

def __del__(self):
self.info("Delete Mempool service")

23 changes: 12 additions & 11 deletions proxy/neon_proxy_app.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
# from .proxy import entry_point
from .mempool.MemPollService import MemPoolService
# from .statistics_exporter.prometheus_proxy_server import PrometheusProxyServer
from .proxy import entry_point
from .mempool.mempool_service import MemPoolService
from .statistics_exporter.prometheus_proxy_server import PrometheusProxyServer
import multiprocessing as mp


class NeonProxyApp:

def __init__(self):
self._mem_pool_service = MemPoolService()
self._mem_pool_process = mp.Process(target=NeonProxyApp.run_mempool_service)

def start(self):
# PrometheusProxyServer()
self._mem_pool_service.start()
# entry_point()
@staticmethod
def run_mempool_service():
MemPoolService().start()

def __del__(self):
self._mem_pool_service.finish()
self._mem_pool_service.join(timeout=5)
def start(self):
PrometheusProxyServer()
self._mem_pool_process.start()
entry_point()
9 changes: 6 additions & 3 deletions proxy/plugin/solana_rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import sha3

from logged_groups import logged_group, logging_context
from typing import Optional, Union
from typing import Union

from ..common.utils import build_http_response
from ..http.codes import httpStatusCodes
from ..http.parser import HttpParser
from ..http.websocket import WebsocketFrame
from ..http.server import HttpWebServerBasePlugin, httpProtocolTypes
from typing import Dict, List, Tuple, Optional
from typing import List, Tuple, Optional

from ..common_neon.transaction_sender import NeonTxSender
from ..common_neon.solana_interactor import SolanaInteractor
Expand All @@ -35,10 +35,10 @@
from ..common_neon.estimate import GasEstimate
from ..common_neon.utils import SolanaBlockInfo
from ..common_neon.keys_storage import KeyStorage
from ..mempool.mempool_client import MemPoolClient
from ..environment import SOLANA_URL, PP_SOLANA_URL, PYTH_MAPPING_ACCOUNT, EVM_STEP_COUNT, CHAIN_ID, ENABLE_PRIVATE_API
from ..environment import NEON_EVM_VERSION, NEON_EVM_REVISION
from ..environment import neon_cli
from ..environment import get_solana_accounts
from ..memdb.memdb import MemDB
from .gas_price_calculator import GasPriceCalculator
from ..common_neon.eth_proto import Trx as EthTrx
Expand All @@ -61,6 +61,7 @@ class EthereumModel:
def __init__(self):
self._solana = SolanaInteractor(SOLANA_URL)
self._db = MemDB(self._solana)
self._mempool_client = MemPoolClient()

if PP_SOLANA_URL == SOLANA_URL:
self.gas_price_calculator = GasPriceCalculator(self._solana, PYTH_MAPPING_ACCOUNT)
Expand Down Expand Up @@ -454,9 +455,11 @@ def eth_sendRawTransaction(self, rawTrx: str) -> str:
self._stat_tx_begin()

try:
#TODO: move it into MemPoolService
tx_sender = NeonTxSender(self._db, self._solana, trx, steps=EVM_STEP_COUNT)
tx_sender.execute()
self._stat_tx_success()
self._mempool_client.on_eth_send_raw_transaction(eth_signature)
return eth_signature

except PendingTxError as err:
Expand Down