Skip to content

Commit 2b79715

Browse files
authored
Find and Cancel hanged transactions #196 (#229)
* get confirmed blocks * remove some logging * view hanged transaction info * parallel requests * transaction processing * canceller * add test * add airdrop for test
1 parent f1d88a6 commit 2b79715

File tree

6 files changed

+487
-36
lines changed

6 files changed

+487
-36
lines changed

proxy/deploy-test.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ export $(/spl/bin/neon-cli --evm_loader JxujFZpNBPADbfw2MnPPgnnFGruzp2ELSFWPQgrj
88

99
curl -v --header "Content-Type: application/json" --data '{"method":"eth_blockNumber","id":1,"jsonrpc":"2.0","params":[]}' $PROXY_URL
1010

11+
solana config set -u $SOLANA_URL
12+
solana config get
13+
solana address
14+
solana airdrop 1000
15+
solana balance
16+
1117
python3 -m unittest discover -v -p 'test*.py'
1218

1319
echo "Deploy test success"

proxy/indexer/solana_receipts_update.py

Lines changed: 65 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,24 @@
99
from sqlitedict import SqliteDict
1010
from typing import Dict, Union
1111

12+
1213
try:
13-
from utils import check_error, get_trx_results, get_trx_receipts, LogDB
14+
from utils import check_error, get_trx_results, get_trx_receipts, LogDB, Canceller
1415
except ImportError:
15-
from .utils import check_error, get_trx_results, get_trx_receipts, LogDB
16+
from .utils import check_error, get_trx_results, get_trx_receipts, LogDB, Canceller
1617

1718

1819
solana_url = os.environ.get("SOLANA_URL", "https://api.devnet.solana.com")
1920
evm_loader_id = os.environ.get("EVM_LOADER", "eeLSJgWzzxrqKv1UxtRVVH8FX3qCQWUs9QuAjJpETGU")
2021
PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2"))
2122

23+
2224
logger = logging.getLogger(__name__)
2325
logger.setLevel(logging.DEBUG)
2426

27+
DEVNET_HISTORY_START = "7BdwyUQ61RUZP63HABJkbW66beLk22tdXnP69KsvQBJekCPVaHoJY47Rw68b3VV1UbQNHxX3uxUSLfiJrfy2bTn"
28+
HISTORY_START = [DEVNET_HISTORY_START]
29+
2530
UPDATE_BLOCK_COUNT = PARALLEL_REQUESTS * 16
2631

2732
class HolderStruct:
@@ -41,6 +46,7 @@ def __init__(self, signature, results):
4146
class Indexer:
4247
def __init__(self):
4348
self.client = Client(solana_url)
49+
self.canceller = Canceller()
4450
self.logs_db = LogDB(filename="local.db")
4551
self.blocks_by_hash = SqliteDict(filename="local.db", tablename="solana_blocks_by_hash", autocommit=True)
4652
self.transaction_receipts = SqliteDict(filename="local.db", tablename="known_transactions", autocommit=True, encode=json.dumps, decode=json.loads)
@@ -49,11 +55,14 @@ def __init__(self):
4955
self.sol_eth_trx = SqliteDict(filename="local.db", tablename="solana_ethereum_transactions", autocommit=True, encode=json.dumps, decode=json.loads)
5056
self.constants = SqliteDict(filename="local.db", tablename="constants", autocommit=True)
5157
self.last_slot = 0
58+
self.current_slot = 0
5259
self.transaction_order = []
5360
if 'last_block' not in self.constants:
5461
self.constants['last_block'] = 0
62+
self.blocked_storages = set()
63+
self.counter_ = 0
5564

56-
def run(self):
65+
def run(self, loop = True):
5766
while (True):
5867
try:
5968
logger.debug("Start indexing")
@@ -62,17 +71,24 @@ def run(self):
6271
self.process_receipts()
6372
logger.debug("Start getting blocks")
6473
self.gather_blocks()
74+
logger.debug("Unlock accounts")
75+
self.canceller.unlock_accounts(self.blocked_storages)
76+
self.blocked_storages = set()
6577
except Exception as err:
6678
logger.debug("Got exception while indexing. Type(err):%s, Exception:%s", type(err), err)
6779

80+
6881
def gather_unknown_transactions(self):
6982
poll_txs = set()
7083
ordered_txs = []
7184

7285
minimal_tx = None
7386
continue_flag = True
74-
minimal_slot = self.client.get_slot()["result"]
87+
current_slot = self.client.get_slot(commitment="confirmed")["result"]
7588
maximum_slot = self.last_slot
89+
minimal_slot = current_slot
90+
91+
percent = 0
7692

7793
counter = 0
7894
while (continue_flag):
@@ -92,6 +108,11 @@ def gather_unknown_transactions(self):
92108
solana_signature = tx["signature"]
93109
slot = tx["slot"]
94110

111+
if solana_signature in HISTORY_START:
112+
logger.debug(solana_signature)
113+
continue_flag = False
114+
break
115+
95116
ordered_txs.append(solana_signature)
96117

97118
if solana_signature not in self.transaction_receipts:
@@ -110,11 +131,7 @@ def gather_unknown_transactions(self):
110131

111132
logger.debug("start getting receipts")
112133
pool = ThreadPool(PARALLEL_REQUESTS)
113-
results = pool.map(self.get_tx_receipts, poll_txs)
114-
115-
for transaction in results:
116-
(solana_signature, trx) = transaction
117-
self.transaction_receipts[solana_signature] = trx
134+
pool.map(self.get_tx_receipts, poll_txs)
118135

119136
if len(self.transaction_order):
120137
index = 0
@@ -128,21 +145,29 @@ def gather_unknown_transactions(self):
128145
self.transaction_order = ordered_txs
129146

130147
self.last_slot = maximum_slot
148+
self.current_slot = current_slot
149+
150+
self.counter_ = 0
131151

132152

133153
def get_tx_receipts(self, solana_signature):
134-
trx = None
154+
# trx = None
135155
retry = True
136156

137157
while retry:
138158
try:
139159
trx = self.client.get_confirmed_transaction(solana_signature)['result']
160+
self.transaction_receipts[solana_signature] = trx
140161
retry = False
141162
except Exception as err:
142163
logger.debug(err)
143164
time.sleep(1)
144165

145-
return (solana_signature, trx)
166+
self.counter_ += 1
167+
if self.counter_ % 100 == 0:
168+
logger.debug(self.counter_)
169+
170+
# return (solana_signature, trx)
146171

147172

148173
def process_receipts(self):
@@ -160,6 +185,7 @@ def process_receipts(self):
160185
trx = self.transaction_receipts[signature]
161186
if trx is None:
162187
logger.error("trx is None")
188+
del self.transaction_receipts[signature]
163189
continue
164190
if 'slot' not in trx:
165191
logger.debug("\n{}".format(json.dumps(trx, indent=4, sort_keys=True)))
@@ -219,12 +245,16 @@ def process_receipts(self):
219245
# raise
220246

221247
del holder_table[write_account]
222-
except rlp.exceptions.DecodingError:
223-
logger.debug("DecodingError")
248+
except rlp.exceptions.RLPException:
224249
pass
225250
except Exception as err:
226-
logger.debug("could not parse trx {}".format(err))
227-
pass
251+
if str(err).startswith("unhashable type"):
252+
pass
253+
elif str(err).startswith("unsupported operand type"):
254+
pass
255+
else:
256+
logger.debug("could not parse trx {}".format(err))
257+
raise
228258

229259
elif instruction_data[0] == 0x01: # Finalize
230260
# logger.debug("{:>10} {:>6} Finalize 0x{}".format(slot, counter, instruction_data.hex()))
@@ -283,8 +313,7 @@ def process_receipts(self):
283313

284314
del continue_table[storage_account]
285315
else:
286-
# logger.debug("Storage not found")
287-
pass
316+
self.add_hunged_storage(trx, storage_account)
288317

289318
elif instruction_data[0] == 0x0a: # Continue
290319
# logger.debug("{:>10} {:>6} Continue 0x{}".format(slot, counter, instruction_data.hex()))
@@ -297,8 +326,8 @@ def process_receipts(self):
297326
got_result = get_trx_results(trx)
298327
if got_result is not None:
299328
continue_table[storage_account] = ContinueStruct(signature, got_result)
300-
# else:
301-
# logger.error("Result not found")
329+
else:
330+
self.add_hunged_storage(trx, storage_account)
302331

303332

304333
elif instruction_data[0] == 0x0b: # ExecuteTrxFromAccountDataIterative
@@ -316,12 +345,15 @@ def process_receipts(self):
316345
holder_table[holder_account] = HolderStruct(storage_account)
317346
else:
318347
holder_table[holder_account] = HolderStruct(storage_account)
348+
else:
349+
self.add_hunged_storage(trx, storage_account)
350+
319351

320352
elif instruction_data[0] == 0x0c: # Cancel
321353
# logger.debug("{:>10} {:>6} Cancel 0x{}".format(slot, counter, instruction_data.hex()))
322354

323355
storage_account = trx['transaction']['message']['accountKeys'][instruction['accounts'][0]]
324-
continue_table[storage_account] = ContinueStruct(signature, (None, 0, None, None, trx['slot']))
356+
continue_table[storage_account] = ContinueStruct(signature, ([], "0x0", 0, [], trx['slot']))
325357

326358
elif instruction_data[0] == 0x0c:
327359
# logger.debug("{:>10} {:>6} PartialCallOrContinueFromRawEthereumTX 0x{}".format(slot, counter, instruction_data.hex()))
@@ -347,6 +379,8 @@ def process_receipts(self):
347379
got_result = get_trx_results(trx)
348380
if got_result is not None:
349381
self.submit_transaction(eth_trx, eth_signature, from_address, got_result, [signature])
382+
else:
383+
self.add_hunged_storage(trx, storage_account)
350384

351385
elif instruction_data[0] == 0x0d:
352386
# logger.debug("{:>10} {:>6} ExecuteTrxFromAccountDataIterativeOrContinue 0x{}".format(slot, counter, instruction_data.hex()))
@@ -368,12 +402,15 @@ def process_receipts(self):
368402
if got_result is not None:
369403
continue_table[storage_account] = ContinueStruct(signature, got_result)
370404
holder_table[holder_account] = HolderStruct(storage_account)
405+
else:
406+
self.add_hunged_storage(trx, storage_account)
371407

372408
if instruction_data[0] > 0x0e:
373409
logger.debug("{:>10} {:>6} Unknown 0x{}".format(slot, counter, instruction_data.hex()))
374410

375411
pass
376412

413+
377414
def submit_transaction(self, eth_trx, eth_signature, from_address, got_result, signatures):
378415
(logs, status, gas_used, return_value, slot) = got_result
379416
(_slot, block_hash) = self.get_block(slot)
@@ -401,6 +438,7 @@ def submit_transaction(self, eth_trx, eth_signature, from_address, got_result, s
401438

402439
logger.debug(eth_signature + " " + status)
403440

441+
404442
def gather_blocks(self):
405443
max_slot = self.client.get_slot(commitment="recent")["result"]
406444

@@ -418,6 +456,7 @@ def gather_blocks(self):
418456

419457
self.constants['last_block'] = max_slot
420458

459+
421460
def get_block(self, slot):
422461
retry = True
423462

@@ -433,11 +472,16 @@ def get_block(self, slot):
433472
return (slot, block_hash)
434473

435474

475+
def add_hunged_storage(self, trx, storage):
476+
if abs(trx['slot'] - self.current_slot) > 16:
477+
self.blocked_storages.add(storage)
478+
479+
436480
def run_indexer():
437481
logging.basicConfig(format='%(asctime)s - pid:%(process)d [%(levelname)-.1s] %(funcName)s:%(lineno)d - %(message)s')
438482
logger.setLevel(logging.DEBUG)
439483
indexer = Indexer()
440-
indexer.run()
484+
indexer.run(False)
441485

442486

443487
if __name__ == "__main__":

0 commit comments

Comments
 (0)