Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
5984c19
cherrypick part of changes
Dec 1, 2021
613a469
create indexer.py
Dec 1, 2021
c0d0e6c
remove solana_receipts_update.py
Dec 1, 2021
3f6b5c4
Cherry pick files from old branch
Dec 1, 2021
0790298
add requirement
Dec 1, 2021
340c854
fix refactoring issues
Dec 1, 2021
7449b38
Fix inspection issues
Dec 1, 2021
b3dacfa
fix last issue
Dec 1, 2021
a50cd47
Merge branch '336_indexer_refactoring' into 337_сreate_base_airdroppe…
Dec 1, 2021
2b8f879
Merge remote-tracking branch 'origin/develop' into 337_сreate_base_ai…
Dec 2, 2021
f51f2ed
simplify tests
Dec 2, 2021
6678924
add test
Dec 2, 2021
5d454b7
Merge remote-tracking branch 'origin/develop' into 337_сreate_base_ai…
Dec 3, 2021
add136a
add price provider
Dec 1, 2021
9a4be44
fix PriceProvider, add test
Dec 1, 2021
07aaca8
Add tests. Check worn on all nets
Dec 1, 2021
7a46c12
refactoring
Dec 1, 2021
2fc1424
integrate price_provider into airdropper
Dec 2, 2021
d157d67
integrate price provider
Dec 2, 2021
8a6abfd
use new faucet method
Dec 3, 2021
3d4dec9
add new parameter to airdropper main
Dec 3, 2021
5c29832
Test discriptions for airdropper
Dec 3, 2021
6a4efdc
Comments for price provider tests
Dec 3, 2021
14aeeed
remove unnecessary comment
Dec 3, 2021
bd35791
Merge remote-tracking branch 'origin/develop' into 338_create_sol_pri…
Dec 3, 2021
ff1f557
Merge remote-tracking branch 'origin/develop' into 338_create_sol_pri…
Dec 6, 2021
c28ca8e
fix error
Dec 6, 2021
8c68755
Merge remote-tracking branch 'origin/develop' into 338_create_sol_pri…
Dec 7, 2021
5325895
Merge remote-tracking branch 'origin/develop' into 338_create_sol_pri…
Dec 8, 2021
a1cbad2
fix airdropper run
Dec 10, 2021
4ed6d78
remove duplicated code
Dec 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix inspection issues
  • Loading branch information
ivanl committed Dec 1, 2021
commit 7449b3817ae66da1fb6ffadbd603b0b5926afc5b
62 changes: 59 additions & 3 deletions proxy/indexer/indexer.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
from proxy.indexer.indexer_base import logger, IndexerBase
from proxy.indexer.indexer_base import logger, IndexerBase, PARALLEL_REQUESTS
import base58
import rlp
import json
import os
import time
import logging
from multiprocessing.dummy import Pool as ThreadPool


try:
from utils import check_error, get_trx_results, get_trx_receipts, LogDB
from utils import check_error, get_trx_results, get_trx_receipts, LogDB, Canceller
from sql_dict import SQLDict
except ImportError:
from .utils import check_error, get_trx_results, get_trx_receipts, LogDB
from .utils import check_error, get_trx_results, get_trx_receipts, LogDB, Canceller
from .sql_dict import SQLDict

CANCEL_TIMEOUT = int(os.environ.get("CANCEL_TIMEOUT", "60"))
UPDATE_BLOCK_COUNT = PARALLEL_REQUESTS * 16

class HolderStruct:
def __init__(self, storage_account):
Expand Down Expand Up @@ -48,17 +52,36 @@ def __init__(self,
evm_loader_id,
log_level = 'INFO'):
IndexerBase.__init__(self, solana_url, evm_loader_id, log_level)

self.canceller = Canceller()
self.logs_db = LogDB()
self.blocks_by_hash = SQLDict(tablename="solana_blocks_by_hash")
self.ethereum_trx = SQLDict(tablename="ethereum_transactions")
self.eth_sol_trx = SQLDict(tablename="ethereum_solana_transactions")
self.sol_eth_trx = SQLDict(tablename="solana_ethereum_transactions")
self.constants = SQLDict(tablename="constants")
if 'last_block' not in self.constants:
self.constants['last_block'] = 0
self.blocked_storages = {}


def process_functions(self):
IndexerBase.process_functions(self)
logger.debug("Process receipts")
self.process_receipts()
logger.debug("Start getting blocks")
self.gather_blocks()
logger.debug("Unlock accounts")
self.canceller.unlock_accounts(self.blocked_storages)
self.blocked_storages = {}


def process_receipts(self):
counter = 0
holder_table = {}
continue_table = {}
trx_table = {}

for signature in self.transaction_order:
counter += 1

Expand Down Expand Up @@ -410,6 +433,39 @@ def submit_transaction(self, trx_struct):
logger.debug(trx_struct.eth_signature + " " + status)


def gather_blocks(self):
max_slot = self.client.get_slot(commitment="recent")["result"]

last_block = self.constants['last_block']
if last_block + UPDATE_BLOCK_COUNT < max_slot:
max_slot = last_block + UPDATE_BLOCK_COUNT
slots = self.client._provider.make_request("getBlocks", last_block, max_slot, {"commitment": "confirmed"})["result"]

pool = ThreadPool(PARALLEL_REQUESTS)
results = pool.map(self.get_block, slots)

for block_result in results:
(slot, block_hash) = block_result
self.blocks_by_hash[block_hash] = slot

self.constants['last_block'] = max_slot


def get_block(self, slot):
retry = True

while retry:
try:
block = self.client._provider.make_request("getBlock", slot, {"commitment":"confirmed", "transactionDetails":"none", "rewards":False})['result']
block_hash = '0x' + base58.b58decode(block['blockhash']).hex()
retry = False
except Exception as err:
logger.debug(err)
time.sleep(1)

return (slot, block_hash)


def run_indexer(solana_url,
evm_loader_id,
log_level = 'INFO'):
Expand Down
71 changes: 9 additions & 62 deletions proxy/indexer/indexer_base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import base58
import os
import time
import logging
Expand All @@ -7,10 +6,8 @@
from typing import Dict, Union

try:
from utils import Canceller
from sql_dict import SQLDict
except ImportError:
from .utils import Canceller
from .sql_dict import SQLDict


Expand All @@ -22,7 +19,6 @@
DEVNET_HISTORY_START = "7BdwyUQ61RUZP63HABJkbW66beLk22tdXnP69KsvQBJekCPVaHoJY47Rw68b3VV1UbQNHxX3uxUSLfiJrfy2bTn"
HISTORY_START = [DEVNET_HISTORY_START]

UPDATE_BLOCK_COUNT = PARALLEL_REQUESTS * 16

log_levels = {
'DEBUG': logging.DEBUG,
Expand All @@ -39,38 +35,28 @@ def __init__(self,
solana_url,
evm_loader_id,
log_level):
logger.setLevel(log_levels.get(log_level, logging.INFO))

self.evm_loader_id = evm_loader_id
self.client = Client(solana_url)
self.canceller = Canceller()
self.blocks_by_hash = SQLDict(tablename="solana_blocks_by_hash")
self.transaction_receipts = SQLDict(tablename="known_transactions")
self.constants = SQLDict(tablename="constants")

logger.setLevel(log_levels.get(log_level, logging.INFO))

self.last_slot = 0
self.current_slot = 0
self.transaction_order = []
if 'last_block' not in self.constants:
self.constants['last_block'] = 0
self.blocked_storages = {}
self.counter_ = 0


def run(self):
while (True):
try:
logger.debug("Start indexing")
self.gather_unknown_transactions()
logger.debug("Process receipts")
self.process_receipts()
logger.debug("Start getting blocks")
self.gather_blocks()
logger.debug("Unlock accounts")
self.canceller.unlock_accounts(self.blocked_storages)
self.blocked_storages = {}
self.process_functions()
except Exception as err:
logger.debug("Got exception while indexing. Type(err):%s, Exception:%s", type(err), err)
logger.warning("Got exception while indexing. Type(err):%s, Exception:%s", type(err), err)


def process_functions(self):
logger.debug("Start indexing")
self.gather_unknown_transactions()


def gather_unknown_transactions(self):
Expand Down Expand Up @@ -161,42 +147,3 @@ def get_tx_receipts(self, solana_signature):
logger.debug(self.counter_)

# return (solana_signature, trx)


def process_receipts(self): raise Exception('NotImplemented')

def gather_blocks(self):
max_slot = self.client.get_slot(commitment="recent")["result"]

last_block = self.constants['last_block']
if last_block + UPDATE_BLOCK_COUNT < max_slot:
max_slot = last_block + UPDATE_BLOCK_COUNT
slots = self.client._provider.make_request("getBlocks", last_block, max_slot, {"commitment": "confirmed"})["result"]

pool = ThreadPool(PARALLEL_REQUESTS)
results = pool.map(self.get_block, slots)

for block_result in results:
(slot, block_hash) = block_result
self.blocks_by_hash[block_hash] = slot

self.constants['last_block'] = max_slot


def get_block(self, slot):
retry = True

while retry:
try:
block = self.client._provider.make_request("getBlock", slot, {"commitment":"confirmed", "transactionDetails":"none", "rewards":False})['result']
block_hash = '0x' + base58.b58decode(block['blockhash']).hex()
retry = False
except Exception as err:
logger.debug(err)
time.sleep(1)

return (slot, block_hash)