Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
36173d3
Get rid of queue based service
rozhkovdmitrii Apr 16, 2022
30e11cf
Move all data types into data.py
rozhkovdmitrii Apr 16, 2022
cbfc8bb
Simplify start mechanism
rozhkovdmitrii Apr 16, 2022
5444085
Update neon_rpc_api_model
rozhkovdmitrii Apr 16, 2022
d956dc7
mempool init
rozhkovdmitrii Apr 16, 2022
a2ada65
update mempool
rozhkovdmitrii Apr 16, 2022
6e810fe
Get rid of second getting price int test_integration_success_read_price
rozhkovdmitrii Apr 16, 2022
4675a2c
Bring processing onto asyncio
rozhkovdmitrii Apr 17, 2022
76b73d7
Implement asyncio event loop on the MemPool
rozhkovdmitrii Apr 17, 2022
dc11a9d
rename mempool_server.py
rozhkovdmitrii Apr 17, 2022
89d63c8
Correct some things
rozhkovdmitrii Apr 17, 2022
6ae513b
Spit and polish
rozhkovdmitrii Apr 17, 2022
4bfd337
Spit and polish
rozhkovdmitrii Apr 17, 2022
7d49886
Merge branch '712-mempool' into 712-restructure-processing
rozhkovdmitrii Apr 17, 2022
817df22
Spit and polish
rozhkovdmitrii Apr 18, 2022
39ad9c9
Move pickable_data server into utils
rozhkovdmitrii Apr 18, 2022
22b0ac7
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii Apr 21, 2022
8b9a821
Introduce a variety of approaches to transfer a pickable data, config
rozhkovdmitrii Apr 21, 2022
017bcde
Spit on MemPool and polish a little
rozhkovdmitrii Apr 21, 2022
1df5786
Move transaction sending functionality
rozhkovdmitrii Apr 21, 2022
3b725ca
.
rozhkovdmitrii Apr 21, 2022
d4b68d7
Bring emulating beck to transaction_validator
rozhkovdmitrii Apr 21, 2022
df68f14
Resolve some remarks made by @a_falaleev
rozhkovdmitrii Apr 21, 2022
406aa6b
spit and polish
rozhkovdmitrii Apr 21, 2022
88376f8
Rename neon_tx_exec_cfg
rozhkovdmitrii Apr 21, 2022
971cb11
Turn down full test suite
rozhkovdmitrii Apr 22, 2022
9dc5d44
Save changes as they are
rozhkovdmitrii Apr 23, 2022
2c0f4eb
Fix pickable data client
rozhkovdmitrii Apr 23, 2022
b4e33e5
Spit ans polish
rozhkovdmitrii Apr 25, 2022
153f3a3
Move executor_mng out from MemPool
rozhkovdmitrii Apr 25, 2022
634b358
Merge branch '712-mempool' into 713-make-get_receipt-working
rozhkovdmitrii Apr 25, 2022
2464609
Move executor_mng out from MemPool
rozhkovdmitrii Apr 25, 2022
18af023
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii Apr 26, 2022
779019f
Get rid of endless request processing
rozhkovdmitrii Apr 26, 2022
f0d9f1a
Just reraise exception when failed to send
rozhkovdmitrii Apr 26, 2022
6dfc5d1
Just reraise exception when failed to send
rozhkovdmitrii Apr 26, 2022
dcf699b
Just reraise exception when failed to send
rozhkovdmitrii Apr 29, 2022
e78153b
Just try to fix logs
rozhkovdmitrii Apr 29, 2022
a467a3d
Logging logs
rozhkovdmitrii Apr 29, 2022
60b713a
Provide some extra logging
rozhkovdmitrii Apr 30, 2022
925d6b2
Just add address to eth_getLogs tests filter
rozhkovdmitrii May 1, 2022
f3c99bb
Fix test_neon_tx_sender.py
rozhkovdmitrii May 1, 2022
cdcca71
enable FTS
rozhkovdmitrii May 1, 2022
5366a53
revert some changes
rozhkovdmitrii May 13, 2022
c6b6935
fix one error
rozhkovdmitrii May 13, 2022
07ccac2
fix counters store
rozhkovdmitrii May 13, 2022
a3bf585
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii May 16, 2022
9f37b0f
Lowercase account before getting transaction count
rozhkovdmitrii May 17, 2022
bd668e1
Use delay to make tests passed
rozhkovdmitrii May 17, 2022
62d553f
Use delay to make tests passed
rozhkovdmitrii May 17, 2022
0b6686b
Rollback some changes
rozhkovdmitrii May 18, 2022
3544bd2
Rollback some changes
rozhkovdmitrii May 18, 2022
260e8f7
Spit and polish
rozhkovdmitrii May 19, 2022
8234bb8
Spit and polish
rozhkovdmitrii May 19, 2022
0a13155
Spit and polish
rozhkovdmitrii May 19, 2022
dbf5ca1
Spit and polish
rozhkovdmitrii May 19, 2022
388f028
Spit and polish
rozhkovdmitrii May 19, 2022
8e7eaf4
Spit and polish
rozhkovdmitrii May 19, 2022
93051f0
Spit and polish
rozhkovdmitrii May 19, 2022
728880a
Spit and polish
rozhkovdmitrii May 19, 2022
3d71178
Merge branch '712-mempool' into 713-send-tx-from-mempool-worker
rozhkovdmitrii May 25, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Just reraise exception when failed to send
  • Loading branch information
rozhkovdmitrii committed Apr 29, 2022
commit dcf699b9fcf789dcb8f8f6c75a99277dbdeed6fc
1 change: 0 additions & 1 deletion proxy/common_neon/utils/pickable_data_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ async def handle_client(self, reader: StreamReader, writer: StreamWriter):
writer.write(result_data)
await writer.drain()
except ConnectionResetError:
self.error(f"Client connection has been closed")
break
except asyncio.exceptions.IncompleteReadError as err:
self.error(f"Incomplete read error: {err}")
Expand Down
1 change: 1 addition & 0 deletions proxy/mempool/executor_mng.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def _get_executor(self) -> Tuple[int, MpExecutorClient]:
return executor_id, executor_info.client

def on_no_liquidity(self, resource_id: int):
self.debug(f"No liquidity, executor: {resource_id} - will be unblocked in: {ExecutorMng.BRING_BACK_EXECUTOR_TIMEOUT_SEC} sec")
asyncio.get_event_loop().create_task(self._release_executor_later(resource_id))

async def _release_executor_later(self, executor_id: int):
Expand Down
13 changes: 8 additions & 5 deletions proxy/mempool/mem_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ def __init__(self, executor: IMemPoolExecutor):

async def enqueue_mp_request(self, mp_request: MemPoolRequest):
tx_hash = mp_request.neon_tx.hash_signed().hex()
log_ctx = {"context": {"req_id": mp_request.req_id}}
try:
self.debug(f"Got mp_tx_request: 0x{tx_hash} to be scheduled on the mempool")
self.debug(f"Got mp_tx_request: 0x{tx_hash} to be scheduled on the mempool", extra=log_ctx)
if len(self._tx_req_queue) > MemPool.TX_QUEUE_MAX_SIZE:
self._tx_req_queue = self._tx_req_queue[-MemPool.TX_QUEUE_SIZE:]
bisect.insort_left(self._tx_req_queue, mp_request)
await self._kick_tx_queue()
except Exception as err:
self.error(f"Failed enqueue tx: {tx_hash} into queue: {err}")
self.error(f"Failed enqueue tx: {tx_hash} into queue: {err}", extra=log_ctx)

async def process_tx_queue(self):
while True:
Expand Down Expand Up @@ -76,15 +77,17 @@ async def check_processing_tasks(self):

async def _process_mp_result(self, resource_id: int, mp_result: MemPoolResult, mp_request: MemPoolRequest):
hash = "0x" + mp_request.neon_tx.hash_signed().hex()
log_ctx = {"context": {"req_id": mp_request.req_id}}
if mp_result.code == MemPoolResultCode.Done:
self.debug(f"Neon tx: {hash} - processed on executor: {resource_id} - done")
self.debug(f"Neon tx: {hash} - processed on executor: {resource_id} - done", extra=log_ctx)
self._on_request_done(mp_request)
self._executor.release_resource(resource_id)
await self._kick_tx_queue()
return
self.warning(f"Failed to process tx: {hash} - on executor: {resource_id}, status: {mp_result} - reschedule")
if mp_result.code == MemPoolResultCode.ToBeRepeat:
self.warning(f"Failed to process tx: {hash} - on executor: {resource_id}, status: {mp_result} - reschedule", extra=log_ctx)
if mp_result.code == MemPoolResultCode.BlockedAccount:
self._executor.release_resource(resource_id)
await self.enqueue_mp_request(mp_request)
await self._kick_tx_queue()
elif mp_result.code == MemPoolResultCode.NoLiquidity:
self._executor.on_no_liquidity(resource_id)
Expand Down
7 changes: 5 additions & 2 deletions proxy/mempool/mempool_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def release_resource(self, resource_id: int):

@dataclass(order=True)
class MemPoolRequest:
req_id: int
signature: str
neon_tx: NeonTx = field(compare=False)
neon_tx_exec_cfg: NeonTxExecCfg = field(compare=False)
Expand All @@ -44,8 +45,10 @@ def __post_init__(self):

class MemPoolResultCode(IntEnum):
Done = 0
ToBeRepeat = 1,
NoLiquidity = 2,
BlockedAccount = 1,
SolanaUnavailable = 2,
NoLiquidity = 3,
Unspecified = 4,
Dummy = -1


Expand Down
17 changes: 9 additions & 8 deletions proxy/mempool/mempool_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import socket
import time

from logged_groups import logged_group
from logged_groups import logged_group, logging_context

from ..common_neon.solana_interactor import SolanaInteractor
from ..common_neon.config import IConfig
Expand Down Expand Up @@ -39,13 +39,14 @@ def _init_in_proc(self):
self._solana = SolanaInteractor(self._config.get_solana_url())
self._db = MemDB(self._solana)

def execute_neon_tx(self, mempool_tx_cfg: MemPoolRequest):
try:
self.execute_neon_tx_impl(mempool_tx_cfg)
except Exception as err:
self.error(f"Failed to execute neon_tx: {err}")
return MemPoolResult(MemPoolResultCode.ToBeRepeat, None)
return MemPoolResult(MemPoolResultCode.Done, None)
def execute_neon_tx(self, mempool_request: MemPoolRequest):
with logging_context(req_id=mempool_request.req_id, exectr=self._id):
try:
self.execute_neon_tx_impl(mempool_request)
except Exception as err:
self.error(f"Failed to execute neon_tx: {err}")
return MemPoolResult(MemPoolResultCode.BlockedAccount, None)
return MemPoolResult(MemPoolResultCode.Done, None)

def execute_neon_tx_impl(self, mempool_tx_cfg: MemPoolRequest):
neon_tx = mempool_tx_cfg.neon_tx
Expand Down
29 changes: 0 additions & 29 deletions proxy/mempool/mempool_tx_executor.py

This file was deleted.

10 changes: 5 additions & 5 deletions proxy/mempool/neon_tx_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def build(self):
pass


@logged_group("neon.Proxy")
@logged_group("neon.MemPool")
class NeonCancelTxStage(NeonTxStage, abc.ABC):
NAME = 'cancelWithNonce'

Expand Down Expand Up @@ -78,7 +78,7 @@ def _create_account_with_seed(self):
return self.s.builder.create_account_with_seed_instruction(self.sol_account, self._seed, self.balance, self.size)


@logged_group("neon.Proxy")
@logged_group("neon.MemPool")
class NeonCreateAccountTxStage(NeonTxStage):
NAME = 'createNeonAccount'

Expand All @@ -98,7 +98,7 @@ def build(self):
self.tx.add(self._create_account())


@logged_group("neon.Proxy")
@logged_group("neon.MemPool")
class NeonCreateERC20TxStage(NeonTxStage, abc.ABC):
NAME = 'createERC20Account'

Expand All @@ -124,7 +124,7 @@ def build(self):
self.tx.add(self._create_erc20_account())


@logged_group("neon.Proxy")
@logged_group("neon.MemPool")
class NeonCreateContractTxStage(NeonCreateAccountWithSeedStage, abc.ABC):
NAME = 'createNeonContract'

Expand All @@ -150,7 +150,7 @@ def build(self):
self.tx.add(self._create_account())


@logged_group("neon.Proxy")
@logged_group("neon.MemPool")
class NeonResizeContractTxStage(NeonCreateAccountWithSeedStage, abc.ABC):
NAME = 'resizeNeonContract'

Expand Down
4 changes: 2 additions & 2 deletions proxy/mempool/operator_resource_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def secret_key(self) -> bytes:
return self.signer.secret_key()


@logged_group("neon.Proxy")
@logged_group("neon.MemPool")
class OperatorResourceList:
# These variables are global for class, they will be initialized one time
_manager = mp.Manager()
Expand Down Expand Up @@ -282,7 +282,7 @@ def free_resource_info(self):
self._free_resource_list.append(resource.idx)


@logged_group("neon.Proxy")
@logged_group("neon.MemPool")
class NeonCreatePermAccount(NeonCreateAccountWithSeedStage, abc.ABC):
NAME = 'createPermAccount'

Expand Down
16 changes: 8 additions & 8 deletions proxy/mempool/transaction_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from .operator_resource_list import OperatorResourceInfo


@logged_group("neon.Proxy")
@logged_group("neon.MemPool")
class NeonTxSender:
def __init__(self, db: MemDB, solana: SolanaInteractor, eth_tx: EthTx, steps: int):
self._db = db
Expand Down Expand Up @@ -205,7 +205,7 @@ def done_account_tx_list(self, skip_create_accounts=False):
self.create_account_tx.instructions.clear()


@logged_group("neon.Proxy")
@logged_group("neon.MemPool")
class BaseNeonTxStrategy(metaclass=abc.ABCMeta):
NAME = 'UNKNOWN STRATEGY'

Expand Down Expand Up @@ -259,7 +259,7 @@ def _validate_gas_limit(self):
return False


@logged_group("neon.Proxy")
@logged_group("neon.MemPool")
class SimpleNeonTxSender(SolTxListSender):
def __init__(self, strategy: BaseNeonTxStrategy, *args, **kwargs):
SolTxListSender.__init__(self, *args, **kwargs)
Expand All @@ -282,7 +282,7 @@ def _on_post_send(self):
raise RuntimeError('Run out of attempts to execute transaction')


@logged_group("neon.Proxy")
@logged_group("neon.MemPool")
class SimpleNeonTxStrategy(BaseNeonTxStrategy, abc.ABC):
NAME = 'CallFromRawEthereumTX'
IS_SIMPLE = True
Expand Down Expand Up @@ -329,7 +329,7 @@ def execute(self) -> (NeonTxResultInfo, [str]):
return tx_sender.neon_res, tx_sender.success_sign_list


@logged_group("neon.Proxy")
@logged_group("neon.MemPool")
class IterativeNeonTxSender(SimpleNeonTxSender):
def __init__(self, *args, **kwargs):
SimpleNeonTxSender.__init__(self, *args, **kwargs)
Expand Down Expand Up @@ -443,7 +443,7 @@ def _on_post_send(self):
self._tx_list.append(self._strategy.build_tx())


@logged_group("neon.Proxy")
@logged_group("neon.MemPool")
class IterativeNeonTxStrategy(BaseNeonTxStrategy, abc.ABC):
NAME = 'PartialCallOrContinueFromRawEthereumTX'
IS_SIMPLE = False
Expand Down Expand Up @@ -492,7 +492,7 @@ def execute(self) -> (NeonTxResultInfo, [str]):
return tx_sender.neon_res, tx_sender.success_sign_list


@logged_group("neon.Proxy")
@logged_group("neon.MemPool")
class HolderNeonTxStrategy(IterativeNeonTxStrategy, abc.ABC):
NAME = 'ExecuteTrxFromAccountDataIterativeOrContinue'

Expand Down Expand Up @@ -531,7 +531,7 @@ def _build_preparation_tx_list(self) -> [TransactionWithComputeBudget]:
return tx_list


@logged_group("neon.Proxy")
@logged_group("neon.MemPool")
class NoChainIdNeonTxStrategy(HolderNeonTxStrategy, abc.ABC):
NAME = 'ExecuteTrxFromAccountDataIterativeOrContinueNoChainId'

Expand Down
9 changes: 4 additions & 5 deletions proxy/neon_rpc_api_model/neon_rpc_api_model.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import json
import multiprocessing
import socket
import traceback
from typing import Optional, Union, Tuple

import sha3
from logged_groups import logged_group
from logged_groups import logged_group, LogMng
from web3.auto import w3

from ..common_neon.address import EthereumAddress
Expand Down Expand Up @@ -450,13 +449,13 @@ def eth_sendRawTransaction(self, rawTrx: str) -> str:
neon_tx_cfg, emulating_result = self.precheck(trx)

self._stat_tx_success()

mempool_tx_request = MemPoolRequest(signature=eth_signature,
req_id = LogMng.get_logging_context().get("req_id")
mempool_tx_request = MemPoolRequest(req_id=req_id,
signature=eth_signature,
neon_tx=trx,
neon_tx_exec_cfg=neon_tx_cfg,
emulating_result=emulating_result)
self._mempool_client.send_raw_transaction(mempool_tx_request)

return eth_signature

except PendingTxError as err:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ ethereum
py-solc-x==1.1.0
flask
prometheus_client==0.13.1
git+https://github.com/neonlabsorg/[email protected].4
git+https://github.com/neonlabsorg/[email protected].5