Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
36173d3
Get rid of queue based service
rozhkovdmitrii Apr 16, 2022
30e11cf
Move all data types into data.py
rozhkovdmitrii Apr 16, 2022
cbfc8bb
Simplify start mechanism
rozhkovdmitrii Apr 16, 2022
5444085
Update neon_rpc_api_model
rozhkovdmitrii Apr 16, 2022
d956dc7
mempool init
rozhkovdmitrii Apr 16, 2022
a2ada65
update mempool
rozhkovdmitrii Apr 16, 2022
6e810fe
Get rid of second getting price int test_integration_success_read_price
rozhkovdmitrii Apr 16, 2022
4675a2c
Bring processing onto asyncio
rozhkovdmitrii Apr 17, 2022
76b73d7
Implement asyncio event loop on the MemPool
rozhkovdmitrii Apr 17, 2022
dc11a9d
rename mempool_server.py
rozhkovdmitrii Apr 17, 2022
89d63c8
Correct some things
rozhkovdmitrii Apr 17, 2022
6ae513b
Spit and polish
rozhkovdmitrii Apr 17, 2022
4bfd337
Spit and polish
rozhkovdmitrii Apr 17, 2022
7d49886
Merge branch '712-mempool' into 712-restructure-processing
rozhkovdmitrii Apr 17, 2022
817df22
Spit and polish
rozhkovdmitrii Apr 18, 2022
39ad9c9
Move pickable_data server into utils
rozhkovdmitrii Apr 18, 2022
22b0ac7
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii Apr 21, 2022
8b9a821
Introduce a variety of approaches to transfer a pickable data, config
rozhkovdmitrii Apr 21, 2022
017bcde
Spit on MemPool and polish a little
rozhkovdmitrii Apr 21, 2022
1df5786
Move transaction sending functionality
rozhkovdmitrii Apr 21, 2022
3b725ca
.
rozhkovdmitrii Apr 21, 2022
d4b68d7
Bring emulating beck to transaction_validator
rozhkovdmitrii Apr 21, 2022
df68f14
Resolve some remarks made by @a_falaleev
rozhkovdmitrii Apr 21, 2022
406aa6b
spit and polish
rozhkovdmitrii Apr 21, 2022
88376f8
Rename neon_tx_exec_cfg
rozhkovdmitrii Apr 21, 2022
971cb11
Turn down full test suite
rozhkovdmitrii Apr 22, 2022
9dc5d44
Save changes as they are
rozhkovdmitrii Apr 23, 2022
2c0f4eb
Fix pickable data client
rozhkovdmitrii Apr 23, 2022
b4e33e5
Spit ans polish
rozhkovdmitrii Apr 25, 2022
153f3a3
Move executor_mng out from MemPool
rozhkovdmitrii Apr 25, 2022
634b358
Merge branch '712-mempool' into 713-make-get_receipt-working
rozhkovdmitrii Apr 25, 2022
2464609
Move executor_mng out from MemPool
rozhkovdmitrii Apr 25, 2022
18af023
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii Apr 26, 2022
779019f
Get rid of endless request processing
rozhkovdmitrii Apr 26, 2022
f0d9f1a
Just reraise exception when failed to send
rozhkovdmitrii Apr 26, 2022
6dfc5d1
Just reraise exception when failed to send
rozhkovdmitrii Apr 26, 2022
dcf699b
Just reraise exception when failed to send
rozhkovdmitrii Apr 29, 2022
e78153b
Just try to fix logs
rozhkovdmitrii Apr 29, 2022
a467a3d
Logging logs
rozhkovdmitrii Apr 29, 2022
60b713a
Provide some extra logging
rozhkovdmitrii Apr 30, 2022
925d6b2
Just add address to eth_getLogs tests filter
rozhkovdmitrii May 1, 2022
f3c99bb
Fix test_neon_tx_sender.py
rozhkovdmitrii May 1, 2022
cdcca71
enable FTS
rozhkovdmitrii May 1, 2022
5366a53
revert some changes
rozhkovdmitrii May 13, 2022
c6b6935
fix one error
rozhkovdmitrii May 13, 2022
07ccac2
fix counters store
rozhkovdmitrii May 13, 2022
a3bf585
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii May 16, 2022
9f37b0f
Lowercase account before getting transaction count
rozhkovdmitrii May 17, 2022
bd668e1
Use delay to make tests passed
rozhkovdmitrii May 17, 2022
62d553f
Use delay to make tests passed
rozhkovdmitrii May 17, 2022
0b6686b
Rollback some changes
rozhkovdmitrii May 18, 2022
3544bd2
Rollback some changes
rozhkovdmitrii May 18, 2022
260e8f7
Spit and polish
rozhkovdmitrii May 19, 2022
8234bb8
Spit and polish
rozhkovdmitrii May 19, 2022
0a13155
Spit and polish
rozhkovdmitrii May 19, 2022
dbf5ca1
Spit and polish
rozhkovdmitrii May 19, 2022
388f028
Spit and polish
rozhkovdmitrii May 19, 2022
8e7eaf4
Spit and polish
rozhkovdmitrii May 19, 2022
93051f0
Spit and polish
rozhkovdmitrii May 19, 2022
728880a
Spit and polish
rozhkovdmitrii May 19, 2022
3d71178
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii May 25, 2022
92aa0f6
Renaming
rozhkovdmitrii May 31, 2022
01544a3
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii May 31, 2022
080b46a
Merge branch '712-mempool' into 712-mempool-renaming
rozhkovdmitrii Jun 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Save changes as they are
  • Loading branch information
rozhkovdmitrii committed Apr 23, 2022
commit 9dc5d4414833d8115d99e3beb5a15e2f6087594a
2 changes: 1 addition & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ steps:
- label: ":terraform: build infrastructure"
key: "create_infrastructure"
if: &is_fts_enabled |
(build.pull_request.base_branch == "712-mempool" && !build.pull_request.draft) ||
(build.pull_request.base_branch == "develop" && !build.pull_request.draft) ||
(build.source == "trigger_job" && build.env("NEON_EVM_FULL_TEST_SUITE") == "true")
agents:
queue: "testing"
Expand Down
8 changes: 4 additions & 4 deletions aio_mempool_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ def async_addr_serv_proc():
srv_proc.start()

client = AddrPickableDataClient(('localhost', 9091))
result = client.send_data({"a": 1})
print(f"Result: {result}")
result = client.send_data({"a": 2})
print(f"Result: {result}")
for i in range(100000):
result = client.send_data({"a": i})
print(f"Result: {result}")

srv_proc.kill()

#pipe_interaction_sample()
Expand Down
6 changes: 0 additions & 6 deletions proxy/common_neon/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,3 @@ def __init__(self, neon_tx_hash: str, neon_income: int, tx_type: str, is_cancele

def add_instruction(self, sol_tx_hash: str, sol_spent: int, steps: int, bpf: int) -> None:
self.instructions.append((sol_tx_hash, sol_spent, steps, bpf))


@dataclass
class NeonTxData:
tx_signed: str

21 changes: 9 additions & 12 deletions proxy/common_neon/utils/pickable_data_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
class PickableDataServerUser(ABC):

@abstractmethod
def on_data_received(self, data: Any) -> Any:
async def on_data_received(self, data: Any) -> Any:
"""Gets neon_tx_data from the neon rpc api service worker"""


Expand All @@ -37,23 +37,26 @@ async def handle_client(self, reader: StreamReader, writer: StreamWriter):
while True:
try:
data = await self._recv_pickable_data(reader)
result = self._user.on_data_received(data)
result = await self._user.on_data_received(data)
result_data = encode_pickable(result)
writer.write(result_data)
await writer.drain()
except ConnectionResetError:
self.error(f"Client connection has been closed")
break
except asyncio.exceptions.IncompleteReadError as err:
self.error(f"Incomplete read error: {err}")
break
except Exception as err:
self.error(f"Failed to receive data err: {err}, type: {type(err)}")
break

async def _recv_pickable_data(self, reader: StreamReader):
len_packed: bytes = await reader.readexactly(4)
len_packed: bytes = await reader.read(4)
if len(len_packed) == 0:
raise ConnectionResetError()
payload_len_data = struct.unpack("!I", len_packed)[0]
payload = await reader.readexactly(payload_len_data)
payload = await reader.read(payload_len_data)
data = pickle.loads(payload)

return data
Expand All @@ -78,29 +81,25 @@ def __init__(self, *, user: PickableDataServerUser, srv_sock: socket.socket):
PickableDataServer.__init__(self, user=user)

async def run_server(self):
print("run_server_by_conn")
reader, writer = await asyncio.streams.open_connection(sock=self._srv_sock)
print("Got reader, writer")
await self.handle_client(reader, writer)


@logged_group("neon.Proxy")
class PickableDataClient:

CONNECTION_TIMEOUT_SEC = 5
CONNECTION_TIMEOUT_SEC = 600

def __init__(self):
self._client_sock = None

def _set_client_sock(self, client_sock: socket.socket):
self._client_sock = client_sock
self._client_sock.setblocking(False)
self._client_sock.settimeout(self.CONNECTION_TIMEOUT_SEC)

def send_data(self, pickable_object: Any):
try:
payload = encode_pickable(pickable_object)
self._client_sock.send(payload)
sent = self._client_sock.send(payload)
len_packed: bytes = self._client_sock.recv(4)
data_len = struct.unpack("!I", len_packed)[0]
data = self._client_sock.recv(data_len)
Expand Down Expand Up @@ -131,8 +130,6 @@ async def send_data_async(self, pickable_object):
self.error(f"Failed to send data: {err}")
raise Exception("Failed to send pickable data")

def __del__(self):
self._client_sock.close()


class PipePickableDataClient(PickableDataClient):
Expand Down
5 changes: 3 additions & 2 deletions proxy/common_neon/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from eth_utils import big_endian_to_int

#TODO: move it out from here
from ...environment import EVM_LOADER_ID


from ..eth_proto import Trx as EthTx

Expand Down Expand Up @@ -133,9 +133,10 @@ def decode(self, neon_sign: str, tx: {}, ix_idx=-1) -> NeonTxResultInfo:
meta_ixs = tx['meta']['innerInstructions']
msg = tx['transaction']['message']
accounts = msg['accountKeys']

from ...environment import EVM_LOADER_ID
for inner_ix in meta_ixs:
ix_idx = inner_ix['index']

for event in inner_ix['instructions']:
if accounts[event['programIdIndex']] == EVM_LOADER_ID:
log = base58.b58decode(event['data'])
Expand Down
78 changes: 78 additions & 0 deletions proxy/mempool/executor_mng.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import asyncio
import dataclasses
import socket
from collections import deque
from typing import List, Tuple
from logged_groups import logged_group

from ..common_neon.config import IConfig
from ..common_neon.utils import PipePickableDataClient

from .mempool_executor import MemPoolExecutor
from .mempool_api import ExecTxRequest


class MpExecutorClient(PipePickableDataClient):

def __init__(self, client_sock: socket.socket):
PipePickableDataClient.__init__(self, client_sock=client_sock)

async def send_tx_request(self, mempool_tx_request: ExecTxRequest):
return await self.send_data_async(mempool_tx_request)


@logged_group("neon.MemPool")
class ExecutorMng:

BRING_BACK_EXECUTOR_TIMEOUT_SEC = 120

@dataclasses.dataclass
class ExecutorInfo:
executor: MemPoolExecutor
client: MpExecutorClient
id: int

def __init__(self, executor_count: int, config: IConfig):
self.info(f"Initialize executor mng with executor_count: {executor_count}")
self._available_pool = deque()
self._busy_pool = set()
self._executors: List[ExecutorMng.ExecutorInfo] = list()
for i in range(executor_count):
executor_info = ExecutorMng._create_executor(i, config)
self._executors.append(executor_info)
self._available_pool.appendleft(i)
executor_info.executor.start()

def has_available(self) -> bool:
return len(self._available_pool) > 0

def get_executor(self) -> Tuple[int, MpExecutorClient]:
executor_id = self._available_pool.pop()
self._busy_pool.add(executor_id)
executor_info = self._executors[executor_id]
return executor_id, executor_info.client

def on_no_liquidity(self, executor_id: int):
asyncio.get_event_loop().create_task(self._release_executor_later(executor_id))

async def _release_executor_later(self, executor_id: int):
await asyncio.sleep(ExecutorMng.BRING_BACK_EXECUTOR_TIMEOUT_SEC)
self.release_executor(executor_id)

def release_executor(self, executor_id: int):
self.debug(f"Release executor: {executor_id}")
self._busy_pool.remove(executor_id)
self._available_pool.appendleft(executor_id)

@staticmethod
def _create_executor(executor_id: int, config: IConfig) -> ExecutorInfo:
client_sock, srv_sock = socket.socketpair()
executor = MemPoolExecutor(executor_id, srv_sock, config)
client = MpExecutorClient(client_sock)
return ExecutorMng.ExecutorInfo(executor=executor, client=client, id=executor_id)

def __del__(self):
for executor_info in self._executors:
executor_info.executor.kill()
self._busy_pool.clear()
self._available_pool.clear()
109 changes: 98 additions & 11 deletions proxy/mempool/mem_pool.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,117 @@
import asyncio
import time
from typing import List, Tuple
from logged_groups import logged_group
from concurrent.futures import ProcessPoolExecutor

import bisect
from ..common_neon.config import IConfig

from .mempool_api import MemPoolTxRequest
from .mempool_tx_executor import MemPoolTxExecutor
from .mempool_api import ExecTxRequest, ExecTxResultCode, ExecTxResult
from .executor_mng import ExecutorMng


@logged_group("neon.MemPool")
class MemPool:

POOL_PROC_COUNT = 8
EXECUTOR_COUNT = 8
TX_QUEUE_MAX_SIZE = 5120
TX_QUEUE_SIZE = 4096
CHECK_TASK_TIMEOUT_SEC = 0.05

def __init__(self, config: IConfig):
self._pool = ProcessPoolExecutor(self.POOL_PROC_COUNT)
self._tx_executor = MemPoolTxExecutor(config)
self._executor_mng = ExecutorMng(self.EXECUTOR_COUNT, config)
self._tx_req_queue = []
self._lock = asyncio.Lock()
self._tx_req_queue_cond = asyncio.Condition()
self._processing_tasks: List[Tuple[int, asyncio.Task, ExecTxRequest]] = []
self._process_tx_results_task = asyncio.get_event_loop().create_task(self.check_processing_tasks())
self._process_tx_queue_task = asyncio.get_event_loop().create_task(self.process_tx_queue())

def send_raw_transaction(self, mempool_tx_request: MemPoolTxRequest) -> bool:
async def send_raw_transaction(self, exec_tx_request: ExecTxRequest) -> bool:
try:
self._pool.submit(MemPool._send_raw_transaction_impl, mempool_tx_request)
#self.debug(f"Got tx {exec_tx_request.neon_tx}")
if len(self._tx_req_queue) > MemPool.TX_QUEUE_MAX_SIZE:
self._tx_req_queue = self._tx_req_queue[-MemPool.TX_QUEUE_SIZE:]
bisect.insort_left(self._tx_req_queue, exec_tx_request)
await self._kick_tx_queue()

except Exception as err:
print(f"Failed enqueue mempool_tx_request into the worker pool: {err}")
self.error(f"Failed enqueue mempool_tx_request into the worker pool: {err}")
return False
return True

@staticmethod
def _send_raw_transaction_impl(mempool_tx_request: MemPoolTxRequest) -> bool:
def _send_raw_transaction_impl(mempool_tx_request: ExecTxRequest) -> bool:
print(f"mempool_tx_request: {mempool_tx_request}")
return True

async def process_tx_queue(self):
while True:
# self.debug("Acquire condition")
async with self._tx_req_queue_cond:
# self.debug("Wait for condition")
await self._tx_req_queue_cond.wait()
if len(self._tx_req_queue) == 0:
# self.debug("Tx queue empty")
continue
if not self._executor_mng.has_available():
# self.debug("No available executor")
continue
# self.debug(f"Pop from queue: {len(self._tx_req_queue)}")
request = self._tx_req_queue.pop()
# self.debug("Poped request from queue")
self.fulfill_request(request)

def fulfill_request(self, request: ExecTxRequest):

executor_id, executor = self._executor_mng.get_executor()
self.debug(f"Fulfill request on executor: {executor_id}")
task = asyncio.get_event_loop().create_task(executor.send_data_async(request))
self._processing_tasks.append((executor_id, task, request))

async def check_processing_tasks(self):
while True:
not_finished_tasks = []
for executor_id, task, mp_request in self._processing_tasks:
if not task.done():
not_finished_tasks.append((executor_id, task, mp_request))
continue
exception = task.exception()
if exception is not None:
self.error(f"Exception during processing request: {exception} - tx will be dropped away")
self._on_request_dropped_away(mp_request)
self._executor_mng.release_executor(executor_id)
continue

result: ExecTxResult = task.result()
assert isinstance(result, ExecTxResult)
assert result.result_code != ExecTxResultCode.Dummy
await self._process_mp_result(executor_id, result, mp_request)

self._processing_tasks = not_finished_tasks
await asyncio.sleep(MemPool.CHECK_TASK_TIMEOUT_SEC)

async def _process_mp_result(self, executor_id, result, mp_request):
if result.result_code == ExecTxResultCode.Done:
self.debug(f"Execution done, request: {mp_request.signature} ")
self._on_request_done(mp_request)
self._executor_mng.release_executor(executor_id)
await self._kick_tx_queue()
elif result.result_code == ExecTxResultCode.ToBeRepeat:
self.warning(f"Request will be repeated: {mp_request.signature}")
self._executor_mng.release_executor(executor_id)
await self.send_raw_transaction(mp_request)
elif result.result_code == ExecTxResultCode.NoLiquidity:
self.warning(f"No liquidity on executor: {executor_id} - will be suspended, request: {mp_request.signature} will be repeated")
self._executor_mng.on_no_liquidity(executor_id)
await self.send_raw_transaction(mp_request)

def _on_request_done(self, tx_request: ExecTxRequest):
pass

def _on_request_dropped_away(self, tx_request: ExecTxRequest):
pass

async def _kick_tx_queue(self):
async with self._tx_req_queue_cond:
# self.debug("Notify queue extended")
self._tx_req_queue_cond.notify()
31 changes: 26 additions & 5 deletions proxy/mempool/mempool_api.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,32 @@
from dataclasses import dataclass
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Any

from ..common_neon.eth_proto import Trx as NeonTx
from ..common_neon.data import NeonTxExecCfg, NeonEmulatingResult


@dataclass(order=True)
class ExecTxRequest:
signature: str
neon_tx: NeonTx = field(compare=False)
neon_tx_exec_cfg: NeonTxExecCfg = field(compare=False)
emulating_result: NeonEmulatingResult = field(compare=False)
_gas_price: int = field(compare=True, default=None)

def __post_init__(self):
"""Calculate and store content length on init"""
self._gas_price = self.neon_tx.gasPrice


class ExecTxResultCode(IntEnum):
Done = 0
ToBeRepeat = 1,
NoLiquidity = 2,
Dummy = -1


@dataclass
class MemPoolTxRequest:
neon_tx: NeonTx
neon_tx_exec_cfg: NeonTxExecCfg
emulating_result: NeonEmulatingResult
class ExecTxResult:
result_code: ExecTxResultCode
data: Any
Loading