Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
36173d3
Get rid of queue based service
rozhkovdmitrii Apr 16, 2022
30e11cf
Move all data types into data.py
rozhkovdmitrii Apr 16, 2022
cbfc8bb
Simplify start mechanism
rozhkovdmitrii Apr 16, 2022
5444085
Update neon_rpc_api_model
rozhkovdmitrii Apr 16, 2022
d956dc7
mempool init
rozhkovdmitrii Apr 16, 2022
a2ada65
update mempool
rozhkovdmitrii Apr 16, 2022
6e810fe
Get rid of second getting price int test_integration_success_read_price
rozhkovdmitrii Apr 16, 2022
4675a2c
Bring processing onto asyncio
rozhkovdmitrii Apr 17, 2022
76b73d7
Implement asyncio event loop on the MemPool
rozhkovdmitrii Apr 17, 2022
dc11a9d
rename mempool_server.py
rozhkovdmitrii Apr 17, 2022
89d63c8
Correct some things
rozhkovdmitrii Apr 17, 2022
6ae513b
Spit and polish
rozhkovdmitrii Apr 17, 2022
4bfd337
Spit and polish
rozhkovdmitrii Apr 17, 2022
7d49886
Merge branch '712-mempool' into 712-restructure-processing
rozhkovdmitrii Apr 17, 2022
817df22
Spit and polish
rozhkovdmitrii Apr 18, 2022
39ad9c9
Move pickable_data server into utils
rozhkovdmitrii Apr 18, 2022
22b0ac7
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii Apr 21, 2022
8b9a821
Introduce a variety of approaches to transfer a pickable data, config
rozhkovdmitrii Apr 21, 2022
017bcde
Spit on MemPool and polish a little
rozhkovdmitrii Apr 21, 2022
1df5786
Move transaction sending functionality
rozhkovdmitrii Apr 21, 2022
3b725ca
.
rozhkovdmitrii Apr 21, 2022
d4b68d7
Bring emulating beck to transaction_validator
rozhkovdmitrii Apr 21, 2022
df68f14
Resolve some remarks made by @a_falaleev
rozhkovdmitrii Apr 21, 2022
406aa6b
spit and polish
rozhkovdmitrii Apr 21, 2022
88376f8
Rename neon_tx_exec_cfg
rozhkovdmitrii Apr 21, 2022
971cb11
Turn down full test suite
rozhkovdmitrii Apr 22, 2022
9dc5d44
Save changes as they are
rozhkovdmitrii Apr 23, 2022
2c0f4eb
Fix pickable data client
rozhkovdmitrii Apr 23, 2022
b4e33e5
Spit ans polish
rozhkovdmitrii Apr 25, 2022
153f3a3
Move executor_mng out from MemPool
rozhkovdmitrii Apr 25, 2022
634b358
Merge branch '712-mempool' into 713-make-get_receipt-working
rozhkovdmitrii Apr 25, 2022
2464609
Move executor_mng out from MemPool
rozhkovdmitrii Apr 25, 2022
18af023
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii Apr 26, 2022
779019f
Get rid of endless request processing
rozhkovdmitrii Apr 26, 2022
f0d9f1a
Just reraise exception when failed to send
rozhkovdmitrii Apr 26, 2022
6dfc5d1
Just reraise exception when failed to send
rozhkovdmitrii Apr 26, 2022
dcf699b
Just reraise exception when failed to send
rozhkovdmitrii Apr 29, 2022
e78153b
Just try to fix logs
rozhkovdmitrii Apr 29, 2022
a467a3d
Logging logs
rozhkovdmitrii Apr 29, 2022
60b713a
Provide some extra logging
rozhkovdmitrii Apr 30, 2022
925d6b2
Just add address to eth_getLogs tests filter
rozhkovdmitrii May 1, 2022
f3c99bb
Fix test_neon_tx_sender.py
rozhkovdmitrii May 1, 2022
cdcca71
enable FTS
rozhkovdmitrii May 1, 2022
5366a53
revert some changes
rozhkovdmitrii May 13, 2022
c6b6935
fix one error
rozhkovdmitrii May 13, 2022
07ccac2
fix counters store
rozhkovdmitrii May 13, 2022
a3bf585
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii May 16, 2022
9f37b0f
Lowercase account before getting transaction count
rozhkovdmitrii May 17, 2022
bd668e1
Use delay to make tests passed
rozhkovdmitrii May 17, 2022
62d553f
Use delay to make tests passed
rozhkovdmitrii May 17, 2022
0b6686b
Rollback some changes
rozhkovdmitrii May 18, 2022
3544bd2
Rollback some changes
rozhkovdmitrii May 18, 2022
260e8f7
Spit and polish
rozhkovdmitrii May 19, 2022
8234bb8
Spit and polish
rozhkovdmitrii May 19, 2022
0a13155
Spit and polish
rozhkovdmitrii May 19, 2022
dbf5ca1
Spit and polish
rozhkovdmitrii May 19, 2022
388f028
Spit and polish
rozhkovdmitrii May 19, 2022
8e7eaf4
Spit and polish
rozhkovdmitrii May 19, 2022
93051f0
Spit and polish
rozhkovdmitrii May 19, 2022
728880a
Spit and polish
rozhkovdmitrii May 19, 2022
3d71178
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii May 25, 2022
92aa0f6
Renaming
rozhkovdmitrii May 31, 2022
01544a3
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii May 31, 2022
080b46a
Merge branch '712-mempool' into 712-mempool-renaming
rozhkovdmitrii Jun 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Bring processing onto asyncio
  • Loading branch information
rozhkovdmitrii committed Apr 17, 2022
commit 4675a2c674e5f4cf6a9eecff9e689d1c7dad1bce
2 changes: 1 addition & 1 deletion proxy/common_neon/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ def add_instruction(self, sol_tx_hash: str, sol_spent: int, steps: int, bpf: int

@dataclass
class NeonTxData:
tx_signed: bytes
tx_signed: str

2 changes: 1 addition & 1 deletion proxy/mempool/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .mempool_server import MemPoolServer
from .mempool_server import PickableDataServer
from .mempool_client import MemPoolClient
from .mempool_service import MemPoolService
from .mempool_client import MemPoolClient
Expand Down
66 changes: 23 additions & 43 deletions proxy/mempool/mem_pool.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,40 @@
import asyncio
from logged_groups import logged_group
from multiprocessing import Pool, Queue
import queue
from concurrent.futures import ProcessPoolExecutor
import time

from ..common_neon.data import NeonTxData


@logged_group("neon.MemPool")
class MemPool:

POOL_PROC_COUNT = 3
TX_QUEUE_TIMEOUT_SEC = 0.04
BREAK_PROC_INVOCATION = 0
POOL_PROC_COUNT = 8

def __init__(self):
self._pool = None
self._tx_queue = Queue()

def _process_init(self):
self._pool = Pool(processes=self.POOL_PROC_COUNT)
self._tx_queue = asyncio.Queue()
self._pool = ProcessPoolExecutor(self.POOL_PROC_COUNT)
self._neon_tx_futures = set()
self._event_loop = asyncio.get_event_loop()
self._event_loop.create_task(self.do_mempool_stuff())
self._send_tx_futures = list()

def on_eth_send_raw_transaction(self, neon_tx_data: NeonTxData):
self._tx_queue.put(neon_tx_data)

def error_callback(self, error):
self.error("Failed to invoke on worker process: ", error)

def on_eth_send_raw_transaction_callback(self, result):
self.debug(f"Processing result: {result}")
self._tx_queue.put_nowait(neon_tx_data)

@staticmethod
def _on_eth_send_raw_transaction_impl(neon_tx_data: NeonTxData) -> bool:
def on_eth_send_raw_transaction_impl(neon_tx_data: NeonTxData) -> bool:
print(f"neon_tx_data: {neon_tx_data}")
time.sleep(0.1)
return True

def do_extras(self):
pass

def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['_pool']
return self_dict

def __setstate__(self, state):
self.__dict__.update(state)

def run(self):
self._process_init()
while True:
try:
if not self._process_queue():
break
except queue.Empty:
self.do_extras()

def _process_queue(self) -> bool:
neon_tx_data = self._tx_queue.get(block=True, timeout=self.TX_QUEUE_TIMEOUT_SEC)
if neon_tx_data == self.BREAK_PROC_INVOCATION:
return False
self._pool.apply_async(MemPool._on_eth_send_raw_transaction_impl, (neon_tx_data, ), callback=self.on_eth_send_raw_transaction_callback, error_callback=self.error_callback)
return True
async def do_mempool_stuff(self):
with self._pool as pool:
while True:
try:
neon_tx_data = await self._tx_queue.get()
pool.submit(MemPool.on_eth_send_raw_transaction_impl, neon_tx_data)
except Exception as err:
print(f"Failed to submit neon tx onto the mempool worker: {err}")

30 changes: 18 additions & 12 deletions proxy/mempool/mempool_client.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@
from multiprocessing.managers import BaseManager
import socket
import pickle
import struct
from logged_groups import logged_group


from ..common_neon.data import NeonTxData


@logged_group("neon.Proxy")
class MemPoolClient:

def __init__(self, host: str, port: int):

self.info(f"Initialize MemPoolClient connecting to: {port} at: {host}")
self._connection = socket.create_connection((host, port))

class MemPoolManager(BaseManager):
def __init__(self):
super(MemPoolManager, self).__init__(address=(host, port), authkey=b'abracadabra')
self.register("MemPool")

self._mempool_manager = MemPoolManager()
self._mempool_manager.connect()
self._mempool = self._mempool_manager.MemPool()

def on_eth_send_raw_transaction(self, neon_tx_data):
def on_eth_send_raw_transaction(self, neon_tx_data: NeonTxData):
try:
self._mempool.on_eth_send_raw_transaction(neon_tx_data)
self.debug(f"Send transaction: {neon_tx_data}")
payload = self.decode_neon_tx_data(neon_tx_data)
self._connection.send(payload)
except BaseException as err:
self.error(f"Failed to send raw transaction onto mempool: {err}")
raise Exception("Failed to send to the mempool")

def decode_neon_tx_data(self, neon_tx_data: NeonTxData):
data = pickle.dumps(neon_tx_data)
data_len = len(data)
packed_len = struct.pack("!I", data_len)
payload = packed_len + data
return payload
64 changes: 42 additions & 22 deletions proxy/mempool/mempool_server.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,67 @@
import multiprocessing as mp
import os
import signal
from multiprocessing.managers import BaseManager
from abc import ABC, abstractmethod
import asyncio
import socket
import pickle
import struct
from typing import Any
from logged_groups import logged_group

from ..common_neon.data import NeonTxData


class MemPoolServerUser(ABC):
class PickableDataServerUser(ABC):

@abstractmethod
def on_eth_send_raw_transaction(self, neon_tx_data: NeonTxData):
def on_data_received(self, data: Any):
"""Gets neon_tx_data from the neon rpc api service worker"""


@logged_group("neon.MemPool")
class MemPoolServer(ABC):
class PickableDataServer(ABC):

QUEUE_TIMEOUT_SEC = 0.4
BREAK_PROC_INVOCATION = 0
JOIN_PROC_TIMEOUT_SEC = 5

def __init__(self, *, user: MemPoolServerUser, host: str, port: int):
def __init__(self, *, user: PickableDataServerUser, host: str, port: int):
self._user = user
self._port = port
self._host = host
self._timeout = self.QUEUE_TIMEOUT_SEC

class MemPoolManager(BaseManager):
pass

MemPoolManager.register("MemPool", lambda: self._user)
self._mempool_manager = MemPoolManager(address=(host, port), authkey=b'abracadabra')
self._mempool_server = self._mempool_manager.get_server()
self._mempool_server_process = mp.Process(target=self._mempool_server.serve_forever, name="mempool_listen_proc")

pid = os.getpid()
signal.signal(signal.SIGINT, lambda sif, frame: self.finish() if os.getpid() == pid else 0)

def start(self):
self.info(f"Start listen on: {self._port} at: {self._host}")
self._mempool_server_process.start()

def finish(self):
self._mempool_server_process.terminate()

async def handle_client(self, client):
loop = asyncio.get_event_loop()
peer_name = client.getpeername()
self.debug(f"Got new incoming connection: {peer_name}")
while True:
try:
len_packed: bytes = await loop.sock_recv(client, 4)
if len(len_packed) == 0:
break
# TODO: all the data can be received by parts, handle it
payload_len_data = struct.unpack("!I", len_packed[:4])[0]
payload = await loop.sock_recv(client, payload_len_data)
data = pickle.loads(payload)
self._user.on_data_received(data)
response = pickle.dumps({"data": data, "status": "ok"})
await loop.sock_sendall(client, response)
except ConnectionResetError:
self.error(f"Client connection: {peer_name} - has been interrupted")
break
client.close()

async def run_server(self):
self.info(f"Listen port: {self._port} on: {self._host}")
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((self._host, self._port))
server.listen(8)
server.setblocking(False)

loop = asyncio.get_event_loop()
while True:
client, _ = await loop.sock_accept(server)
loop.create_task(self.handle_client(client))
28 changes: 13 additions & 15 deletions proxy/mempool/mempool_service.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
from logged_groups import logged_group
from multiprocessing import Process
import asyncio

from .mempool_server import MemPoolServer, MemPoolServerUser
from .mempool_server import PickableDataServer, PickableDataServerUser
from .mem_pool import MemPool

from ..common_neon.data import NeonTxData

from typing import Any

@logged_group("neon.MemPool")
class MemPoolService(MemPoolServerUser):
class MemPoolService(PickableDataServerUser):

MEMPOOL_SERVICE_PORT = 9091
MEMPOOL_SERVICE_HOST = "127.0.0.1"
MEMPOOL_SERVICE_HOST = "0.0.0.0"

def __init__(self):
self._mempool_server = MemPoolServer(user=self, host=self.MEMPOOL_SERVICE_HOST, port=self.MEMPOOL_SERVICE_PORT)
self.event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.event_loop)

self._mempool_server = PickableDataServer(user=self, host=self.MEMPOOL_SERVICE_HOST, port=self.MEMPOOL_SERVICE_PORT)
self._mempool = MemPool()
self._mempool_proc = Process(target=self.run_mempool)

def start(self):
self._mempool_server.start()
self._mempool_proc.start()

def on_eth_send_raw_transaction(self, neon_tx_data: NeonTxData):
self._mempool.on_eth_send_raw_transaction(neon_tx_data)
self.info("Run until complete")
self.event_loop.run_until_complete(self._mempool_server.run_server())

def run_mempool(self):
self._mempool.run()
def on_data_received(self, data: Any):
self._mempool.on_eth_send_raw_transaction(data)
7 changes: 4 additions & 3 deletions proxy/neon_rpc_api_model/neon_rpc_api_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
from web3.auto import w3

from ..common_neon.address import EthereumAddress
from ..common_neon.emulator_interactor import call_emulated, call_trx_emulated
from ..common_neon.emulator_interactor import call_emulated
from ..common_neon.errors import EthereumError, InvalidParamError, PendingTxError
from ..common_neon.estimate import GasEstimate
from ..common_neon.eth_proto import Trx as EthTrx
from ..common_neon.keys_storage import KeyStorage
from ..common_neon.solana_interactor import SolanaInteractor
from ..common_neon.utils import SolanaBlockInfo
from ..common_neon.data import NeonTxPrecheckResult
from ..common_neon.data import NeonTxPrecheckResult, NeonTxData
from ..environment import SOLANA_URL, PP_SOLANA_URL, PYTH_MAPPING_ACCOUNT, NEON_EVM_VERSION, NEON_EVM_REVISION, \
CHAIN_ID, neon_cli, EVM_STEP_COUNT
from ..memdb.memdb import MemDB
Expand Down Expand Up @@ -451,7 +451,8 @@ def eth_sendRawTransaction(self, rawTrx: str) -> str:
tx_sender.execute(neon_tx_precheck_result)

self._stat_tx_success()
self._mempool_client.on_eth_send_raw_transaction(eth_signature)
neon_tx_data = NeonTxData(tx_signed=rawTrx)
self._mempool_client.on_eth_send_raw_transaction(neon_tx_data)
return eth_signature

except PendingTxError as err:
Expand Down