Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
36173d3
Get rid of queue based service
rozhkovdmitrii Apr 16, 2022
30e11cf
Move all data types into data.py
rozhkovdmitrii Apr 16, 2022
cbfc8bb
Simplify start mechanism
rozhkovdmitrii Apr 16, 2022
5444085
Update neon_rpc_api_model
rozhkovdmitrii Apr 16, 2022
d956dc7
mempool init
rozhkovdmitrii Apr 16, 2022
a2ada65
update mempool
rozhkovdmitrii Apr 16, 2022
6e810fe
Get rid of second getting price int test_integration_success_read_price
rozhkovdmitrii Apr 16, 2022
4675a2c
Bring processing onto asyncio
rozhkovdmitrii Apr 17, 2022
76b73d7
Implement asyncio event loop on the MemPool
rozhkovdmitrii Apr 17, 2022
dc11a9d
rename mempool_server.py
rozhkovdmitrii Apr 17, 2022
89d63c8
Correct some things
rozhkovdmitrii Apr 17, 2022
6ae513b
Spit and polish
rozhkovdmitrii Apr 17, 2022
4bfd337
Spit and polish
rozhkovdmitrii Apr 17, 2022
7d49886
Merge branch '712-mempool' into 712-restructure-processing
rozhkovdmitrii Apr 17, 2022
817df22
Spit and polish
rozhkovdmitrii Apr 18, 2022
39ad9c9
Move pickable_data server into utils
rozhkovdmitrii Apr 18, 2022
22b0ac7
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii Apr 21, 2022
8b9a821
Introduce a variety of approaches to transfer a pickable data, config
rozhkovdmitrii Apr 21, 2022
017bcde
Spit on MemPool and polish a little
rozhkovdmitrii Apr 21, 2022
1df5786
Move transaction sending functionality
rozhkovdmitrii Apr 21, 2022
3b725ca
.
rozhkovdmitrii Apr 21, 2022
d4b68d7
Bring emulating beck to transaction_validator
rozhkovdmitrii Apr 21, 2022
df68f14
Resolve some remarks made by @a_falaleev
rozhkovdmitrii Apr 21, 2022
406aa6b
spit and polish
rozhkovdmitrii Apr 21, 2022
88376f8
Rename neon_tx_exec_cfg
rozhkovdmitrii Apr 21, 2022
971cb11
Turn down full test suite
rozhkovdmitrii Apr 22, 2022
9dc5d44
Save changes as they are
rozhkovdmitrii Apr 23, 2022
2c0f4eb
Fix pickable data client
rozhkovdmitrii Apr 23, 2022
b4e33e5
Spit ans polish
rozhkovdmitrii Apr 25, 2022
153f3a3
Move executor_mng out from MemPool
rozhkovdmitrii Apr 25, 2022
634b358
Merge branch '712-mempool' into 713-make-get_receipt-working
rozhkovdmitrii Apr 25, 2022
2464609
Move executor_mng out from MemPool
rozhkovdmitrii Apr 25, 2022
18af023
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii Apr 26, 2022
779019f
Get rid of endless request processing
rozhkovdmitrii Apr 26, 2022
f0d9f1a
Just reraise exception when failed to send
rozhkovdmitrii Apr 26, 2022
6dfc5d1
Just reraise exception when failed to send
rozhkovdmitrii Apr 26, 2022
dcf699b
Just reraise exception when failed to send
rozhkovdmitrii Apr 29, 2022
e78153b
Just try to fix logs
rozhkovdmitrii Apr 29, 2022
a467a3d
Logging logs
rozhkovdmitrii Apr 29, 2022
60b713a
Provide some extra logging
rozhkovdmitrii Apr 30, 2022
925d6b2
Just add address to eth_getLogs tests filter
rozhkovdmitrii May 1, 2022
f3c99bb
Fix test_neon_tx_sender.py
rozhkovdmitrii May 1, 2022
cdcca71
enable FTS
rozhkovdmitrii May 1, 2022
5366a53
revert some changes
rozhkovdmitrii May 13, 2022
c6b6935
fix one error
rozhkovdmitrii May 13, 2022
07ccac2
fix counters store
rozhkovdmitrii May 13, 2022
a3bf585
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii May 16, 2022
9f37b0f
Lowercase account before getting transaction count
rozhkovdmitrii May 17, 2022
bd668e1
Use delay to make tests passed
rozhkovdmitrii May 17, 2022
62d553f
Use delay to make tests passed
rozhkovdmitrii May 17, 2022
0b6686b
Rollback some changes
rozhkovdmitrii May 18, 2022
3544bd2
Rollback some changes
rozhkovdmitrii May 18, 2022
260e8f7
Spit and polish
rozhkovdmitrii May 19, 2022
8234bb8
Spit and polish
rozhkovdmitrii May 19, 2022
0a13155
Spit and polish
rozhkovdmitrii May 19, 2022
dbf5ca1
Spit and polish
rozhkovdmitrii May 19, 2022
388f028
Spit and polish
rozhkovdmitrii May 19, 2022
8e7eaf4
Spit and polish
rozhkovdmitrii May 19, 2022
93051f0
Spit and polish
rozhkovdmitrii May 19, 2022
728880a
Spit and polish
rozhkovdmitrii May 19, 2022
3d71178
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii May 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Introduce a variety of approaches to transfer a pickable data, config
  • Loading branch information
rozhkovdmitrii committed Apr 21, 2022
commit 8b9a8214bcdfef02e72f566af3151393587dfb5e
27 changes: 27 additions & 0 deletions proxy/common_neon/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from abc import ABC, abstractmethod
from typing import Optional
import os


class IConfig(ABC):

@abstractmethod
def get_solana_url(self) -> Optional[str]:
"""Gets the predefinded solana url"""

@abstractmethod
def get_evm_count(self) -> Optional[int]:
"""Gets the evm count"""


class Config(IConfig):

def __init__(self):
from ..environment import read_elf_params, ELF_PARAMS
read_elf_params(ELF_PARAMS)

def get_solana_url(self) -> Optional[str]:
return os.environ.get("SOLANA_URL", "http://localhost:8899")

def get_evm_count(self) -> Optional[int]:
return int(os.environ.get("EVM_STEP_COUNT", 750))
12 changes: 10 additions & 2 deletions proxy/common_neon/data.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Any
from .eth_proto import Trx as NeonTx


@dataclass
class NeonTxPrecheckResult:
is_underpriced_tx_without_chainid: bool
class NeonTxCfg:
is_without_chainid: bool
steps_executed: int


@dataclass
class MemPoolTxCfg:
neon_tx: NeonTx
neon_tx_cfg: NeonTxCfg
emulating_result: NeonEmulatingResult


Expand Down
3 changes: 2 additions & 1 deletion proxy/common_neon/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .utils import *

from .pickable_data_server import PickableDataServer, PickableDataServerUser, PickableDataClient
from .pickable_data_server import AddrPickableDataSrv, PipePickableDataSrv, PickableDataServerUser, \
AddrPickableDataClient, PipePickableDataClient

154 changes: 111 additions & 43 deletions proxy/common_neon/utils/pickable_data_server.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,152 @@
from typing import Any, Tuple
from abc import ABC, abstractmethod

import asyncio
from asyncio import StreamReader, StreamWriter
import socket
import pickle
import struct
from typing import Any
from logged_groups import logged_group


class PickableDataServerUser(ABC):

@abstractmethod
def on_data_received(self, data: Any):
def on_data_received(self, data: Any) -> Any:
"""Gets neon_tx_data from the neon rpc api service worker"""


def encode_pickable(object) -> bytes:
data = pickle.dumps(object)
len_data = struct.pack("!I", len(data))
return len_data + data


@logged_group("neon.MemPool")
class PickableDataServer(ABC):

def __init__(self, *, user: PickableDataServerUser, host: str, port: int):
def __init__(self, *, user: PickableDataServerUser):
self._user = user
self._port = port
self._host = host
asyncio.get_event_loop().create_task(self.run_server())

@abstractmethod
async def run_server(self):
assert False

async def handle_client(self, client):
loop = asyncio.get_event_loop()
peer_name = client.getpeername()
self.debug(f"Got new incoming connection: {peer_name}")
async def handle_client(self, reader: StreamReader, writer: StreamWriter):
while True:
try:
len_packed: bytes = await loop.sock_recv(client, 4)
if len(len_packed) == 0:
break
# TODO: all the data can be received by parts, handle it
payload_len_data = struct.unpack("!I", len_packed[:4])[0]
payload = await loop.sock_recv(client, payload_len_data)
data = pickle.loads(payload)
self._user.on_data_received(data)
data = await self._recv_pickable_data(reader)
result = self._user.on_data_received(data)
result_data = encode_pickable(result)
writer.write(result_data)
await writer.drain()
except ConnectionResetError:
self.error(f"Client connection: {peer_name} - has been interrupted")
self.error(f"Client connection has been closed")
break
except Exception as err:
self.error(f"Failed to receive data over: {peer_name} - err: {err}")
continue
client.close()
self.error(f"Failed to receive data err: {err}, type: {type(err)}")
break

async def _recv_pickable_data(self, reader: StreamReader):
len_packed: bytes = await reader.readexactly(4)
if len(len_packed) == 0:
raise ConnectionResetError()
payload_len_data = struct.unpack("!I", len_packed)[0]
payload = await reader.readexactly(payload_len_data)
data = pickle.loads(payload)

return data


class AddrPickableDataSrv(PickableDataServer):

def __init__(self, *, user: PickableDataServerUser, address: Tuple[str, int]):
self._address = address
PickableDataServer.__init__(self, user=user)

async def run_server(self):
self.info(f"Listen port: {self._port} on: {self._host}")
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((self._host, self._port))
server.listen(8)
server.setblocking(False)
host, port = self._address
self.info(f"Listen port: {port} on: {host}")
await asyncio.start_server(self.handle_client, host, port)

loop = asyncio.get_event_loop()
while True:
client, _ = await loop.sock_accept(server)
loop.create_task(self.handle_client(client))

class PipePickableDataSrv(PickableDataServer):

def __init__(self, *, user: PickableDataServerUser, srv_sock: socket.socket):
self._srv_sock = srv_sock
PickableDataServer.__init__(self, user=user)

async def run_server(self):
print("run_server_by_conn")
reader, writer = await asyncio.streams.open_connection(sock=self._srv_sock)
print("Got reader, writer")
await self.handle_client(reader, writer)


@logged_group("neon.Proxy")
class PickableDataClient:

def __init__(self, host: str, port: int):
CONNECTION_TIMEOUT_SEC = 5

self.info(f"Initialize PickableDataClient connecting to: {port} at: {host}")
self._connection = socket.create_connection((host, port))
def __init__(self):
self._client_sock = None

def send_data(self, pickable_data: Any):
def _set_client_sock(self, client_sock: socket.socket):
self._client_sock = client_sock
self._client_sock.setblocking(False)
self._client_sock.settimeout(self.CONNECTION_TIMEOUT_SEC)

def send_data(self, pickable_object: Any):
try:
payload = self._encode_pickable_data(pickable_data)
self._connection.send(payload)
payload = encode_pickable(pickable_object)
self._client_sock.send(payload)
len_packed: bytes = self._client_sock.recv(4)
data_len = struct.unpack("!I", len_packed)[0]
data = self._client_sock.recv(data_len)
if not data:
return None
result = pickle.loads(data)
return result
except BaseException as err:
self.error(f"Failed to send data: {err}")
raise Exception("Failed to send pickable data")

def _encode_pickable_data(self, pickable_data: Any):
data = pickle.dumps(pickable_data)
data_len = len(data)
packed_len = struct.pack("!I", data_len)
payload = packed_len + data
return payload
async def send_data_async(self, pickable_object):
reader, writer = await asyncio.streams.open_connection(sock=self._client_sock)
try:
payload = encode_pickable(pickable_object)
writer.write(payload)
await writer.drain()
len_packed: bytes = await reader.readexactly(4)
if not len_packed:
return None
data_len = struct.unpack("!I", len_packed)[0]
data = await reader.readexactly(data_len)
if not data:
return None
result = pickle.loads(data)
return result
except BaseException as err:
self.error(f"Failed to send data: {err}")
raise Exception("Failed to send pickable data")

def __del__(self):
self._connection.close()
self._client_sock.close()


class PipePickableDataClient(PickableDataClient):

def __init__(self, client_sock: socket.socket):
PickableDataClient.__init__(self)
self._set_client_sock(client_sock=client_sock)


class AddrPickableDataClient(PickableDataClient):

def __init__(self, addr: Tuple[str, int]):
PickableDataClient.__init__(self)
host, port = addr
client_sock = socket.create_connection((host, port))
self._set_client_sock(client_sock=client_sock)

5 changes: 2 additions & 3 deletions proxy/common_neon/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@

from eth_utils import big_endian_to_int

#TODO: move it out from here
from ...environment import EVM_LOADER_ID

from ..eth_proto import Trx as EthTx


Expand Down Expand Up @@ -129,6 +126,8 @@ def fill_block_info(self, block: SolanaBlockInfo):
rec['blockNumber'] = hex(block.slot)

def decode(self, neon_sign: str, tx: {}, ix_idx=-1) -> NeonTxResultInfo:
# TODO: move it out from here
from ...environment import EVM_LOADER_ID
self._set_defaults()
meta_ixs = tx['meta']['innerInstructions']
msg = tx['transaction']['message']
Expand Down