Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
8d079ed
introduce logged_based
Dec 27, 2021
4b609b1
Get used with logged_group
Dec 29, 2021
a63d17d
Provide logged group on the rest of code base
Dec 29, 2021
4c3ffad
Merge remote-tracking branch 'origin/develop' into 422_introduce_logg…
Dec 29, 2021
b4eda35
Add Airdropper logged_group
Dec 29, 2021
8275f07
Spit and polish
Dec 29, 2021
b3ecaae
Spit and polish
Dec 29, 2021
c8d51ff
Spit and polish
Dec 29, 2021
e5854f4
Spit and polish
Dec 29, 2021
1c36d89
Spit and polish
Dec 29, 2021
bea6cfa
Spit and polish
Dec 29, 2021
465754e
Spit and polish
Dec 29, 2021
f7a055a
rollback proxy.py
Dec 29, 2021
c345177
spit and polish
Dec 29, 2021
ab79eb0
Remove log_level param from airdropper
Dec 29, 2021
76f914e
Spit and polish
Dec 29, 2021
c5cbdba
Fix indexer base init error
Dec 29, 2021
ee87cb7
Fix indexer base init error
Dec 29, 2021
1218d78
Reduce Airdropper level
Dec 30, 2021
523e44f
roll back get_measurements prefix
Dec 30, 2021
dcdf481
Get logged-groups package from different place
Dec 30, 2021
13242b0
install git to resolve python dependency
Dec 30, 2021
b32b936
Spit on Dockerfile and polish
Dec 30, 2021
35a51b7
Fix get_measurements errors parsig command
Dec 30, 2021
5372136
Merge remote-tracking branch 'origin/develop' into 422_introduce_logg…
Dec 30, 2021
125c56e
Polish after merge
Dec 30, 2021
3f759ac
Fix
Dec 30, 2021
3862077
fix
Dec 30, 2021
a7b1de2
fix
Dec 30, 2021
ed83fe6
Merge branch 'develop' into 422_introduce_logged_groups
Jan 10, 2022
440ba11
fix
Jan 10, 2022
7989afb
fix
Jan 10, 2022
ccdd299
spit and polish
Jan 10, 2022
0b81aca
fix errors
Jan 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
introduce logged_based
  • Loading branch information
rozhkovdmitrii committed Dec 27, 2021
commit 8d079edfa3ce210c5231bb58137ee1b61d0be642
7 changes: 7 additions & 0 deletions log_cfg.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"format": null,
"colored": true,
"groups": {
"Indexer": "DEBUG"
}
}
29 changes: 16 additions & 13 deletions proxy/indexer/airdropper.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from proxy.indexer.indexer_base import IndexerBase, logger
from proxy.indexer.indexer_base import IndexerBase
from proxy.indexer.price_provider import PriceProvider, mainnet_solana, mainnet_price_accounts
import os
import requests
import base58
import json
import logging
from logged_based_class import logged_based

try:
from utils import check_error
Expand All @@ -17,6 +18,8 @@
AIRDROP_AMOUNT_SOL = ACCOUNT_CREATION_PRICE_SOL / 2
NEON_PRICE_USD = 0.25


@logged_based("Indexer")
class Airdropper(IndexerBase):
def __init__(self,
solana_url,
Expand Down Expand Up @@ -76,23 +79,23 @@ def _airdrop_to(self, create_acc):

sol_price_usd = self.price_provider.get_price('SOL/USD')
if sol_price_usd is None:
logger.warning("Failed to get SOL/USD price")
self.warning("Failed to get SOL/USD price")
return

logger.info(f'SOL/USD = ${sol_price_usd}')
self.info(f'SOL/USD = ${sol_price_usd}')
airdrop_amount_usd = AIRDROP_AMOUNT_SOL * sol_price_usd
logger.info(f"Airdrop amount: ${airdrop_amount_usd}")
logger.info(f"NEON price: ${NEON_PRICE_USD}")
self.info(f"Airdrop amount: ${airdrop_amount_usd}")
self.info(f"NEON price: ${NEON_PRICE_USD}")
airdrop_amount_neon = airdrop_amount_usd / NEON_PRICE_USD
logger.info(f"Airdrop {airdrop_amount_neon} NEONs to address: {eth_address}")
self.info(f"Airdrop {airdrop_amount_neon} NEONs to address: {eth_address}")
airdrop_galans = int(airdrop_amount_neon * pow(10, self.neon_decimals))

json_data = { 'wallet': eth_address, 'amount': airdrop_galans }
resp = requests.post(self.faucet_url + '/request_neon_in_galans', json = json_data)
if not resp.ok:
logger.warning(f'Failed to airdrop: {resp.status_code}')
self.warning(f'Failed to airdrop: {resp.status_code}')
return

self.airdrop_ready[eth_address] = create_acc


Expand Down Expand Up @@ -135,7 +138,7 @@ def find_instructions(trx, predicate):

def process_functions(self):
IndexerBase.process_functions(self)
logger.debug("Process receipts")
self.debug("Process receipts")
self.process_receipts()


Expand All @@ -146,11 +149,11 @@ def process_receipts(self):
if signature in self.transaction_receipts:
trx = self.transaction_receipts[signature]
if trx is None:
logger.error("trx is None")
self.error("trx is None")
del self.transaction_receipts[signature]
continue
if 'slot' not in trx:
logger.debug("\n{}".format(json.dumps(trx, indent=4, sort_keys=True)))
self.debug("\n{}".format(json.dumps(trx, indent=4, sort_keys=True)))
exit()
if trx['transaction']['message']['instructions'] is not None:
self.process_trx_airdropper_mode(trx)
Expand All @@ -164,8 +167,8 @@ def run_airdropper(solana_url,
price_update_interval = 60,
neon_decimals = 9):
logging.basicConfig(format='%(asctime)s - pid:%(process)d [%(levelname)-.1s] %(funcName)s:%(lineno)d - %(message)s')
logger.setLevel(logging.DEBUG)
logger.info(f"""Running indexer with params:
self.setLevel(logging.DEBUG)
self.info(f"""Running indexer with params:
solana_url: {solana_url},
evm_loader_id: {evm_loader_id},
log_level: {log_level},
Expand Down
66 changes: 30 additions & 36 deletions proxy/indexer/indexer.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from proxy.indexer.indexer_base import logger, IndexerBase, PARALLEL_REQUESTS
from proxy.indexer.indexer_base import IndexerBase, PARALLEL_REQUESTS
import base58
import json
import logging
import os
import rlp
import time
import logging
from multiprocessing.dummy import Pool as ThreadPool

from multiprocessing.dummy import Pool as ThreadPool
from logged_based_class import logged_based

try:
from utils import check_error, get_trx_results, get_trx_receipts, LogDB, Canceller
Expand Down Expand Up @@ -48,6 +47,7 @@ def __init__(self, eth_trx, eth_signature, from_address, got_result, signatures,
self.slot = slot


@logged_based("Indexer")
class Indexer(IndexerBase):
def __init__(self,
solana_url,
Expand All @@ -66,14 +66,13 @@ def __init__(self,
self.constants['last_block'] = 0
self.blocked_storages = {}


def process_functions(self):
IndexerBase.process_functions(self)
logger.debug("Process receipts")
self.debug("Process receipts")
self.process_receipts()
logger.debug("Start getting blocks")
self.debug("Start getting blocks")
self.gather_blocks()
logger.debug("Unlock accounts")
self.debug("Unlock accounts")
self.canceller.unlock_accounts(self.blocked_storages)
self.blocked_storages = {}

Expand All @@ -93,11 +92,11 @@ def process_receipts(self):
if signature in self.transaction_receipts:
trx = self.transaction_receipts[signature]
if trx is None:
logger.error("trx is None")
self.error("trx is None")
del self.transaction_receipts[signature]
continue
if 'slot' not in trx:
logger.debug("\n{}".format(json.dumps(trx, indent=4, sort_keys=True)))
self.debug("\n{}".format(json.dumps(trx, indent=4, sort_keys=True)))
exit()
slot = trx['slot']
if trx['transaction']['message']['instructions'] is not None:
Expand Down Expand Up @@ -151,7 +150,7 @@ def process_receipts(self):
try:
(eth_trx, eth_signature, from_address) = get_trx_receipts(unsigned_msg, signature)
if len(eth_trx) / 2 > holder_table[write_account].max_written:
logger.debug("WRITE got {} exp {}".format(len(eth_trx), holder_table[write_account].max_written))
self.debug("WRITE got {} exp {}".format(len(eth_trx), holder_table[write_account].max_written))
continue

if storage_account in continue_table:
Expand All @@ -171,8 +170,8 @@ def process_receipts(self):

del continue_table[storage_account]
else:
logger.error("Storage not found")
logger.error(eth_signature, "unknown")
self.error("Storage not found")
self.error(eth_signature, "unknown")
# raise

del holder_table[write_account]
Expand All @@ -187,7 +186,7 @@ def process_receipts(self):
# logger.debug("unsupported operand type")
pass
else:
logger.debug("could not parse trx {}".format(err))
self.debug("could not parse trx {}".format(err))
raise

elif instruction_data[0] == 0x01: # Finalize
Expand Down Expand Up @@ -234,7 +233,7 @@ def process_receipts(self):
slot
)
else:
logger.error("RESULT NOT FOUND IN 05\n{}".format(json.dumps(trx, indent=4, sort_keys=True)))
self.error("RESULT NOT FOUND IN 05\n{}".format(json.dumps(trx, indent=4, sort_keys=True)))

elif instruction_data[0] == 0x09 or instruction_data[0] == 0x13: # PartialCallFromRawEthereumTX PartialCallFromRawEthereumTXv02
# if instruction_data[0] == 0x09:
Expand Down Expand Up @@ -269,7 +268,7 @@ def process_receipts(self):
if storage_account in continue_table:
continue_result = continue_table[storage_account]
if continue_result.accounts != blocked_accounts:
logger.error("Strange behavior. Pay attention. BLOCKED ACCOUNTS NOT EQUAL")
self.error("Strange behavior. Pay attention. BLOCKED ACCOUNTS NOT EQUAL")
trx_table[eth_signature].got_result = continue_result.results
trx_table[eth_signature].signatures += continue_result.signatures
trx_table[eth_signature].slot = max(trx_table[eth_signature].slot, continue_result.slot)
Expand All @@ -292,9 +291,9 @@ def process_receipts(self):

if got_result is not None:
if continue_table[storage_account].results is not None:
logger.error("Strange behavior. Pay attention. RESULT ALREADY EXISTS IN CONTINUE TABLE")
self.error("Strange behavior. Pay attention. RESULT ALREADY EXISTS IN CONTINUE TABLE")
if continue_table[storage_account].accounts != blocked_accounts:
logger.error("Strange behavior. Pay attention. BLOCKED ACCOUNTS NOT EQUAL")
self.error("Strange behavior. Pay attention. BLOCKED ACCOUNTS NOT EQUAL")

continue_table[storage_account].results = got_result
else:
Expand All @@ -316,7 +315,7 @@ def process_receipts(self):

if holder_account in holder_table:
if holder_table[holder_account].storage_account != storage_account:
logger.error("Strange behavior. Pay attention. STORAGE_ACCOUNT != STORAGE_ACCOUNT")
self.error("Strange behavior. Pay attention. STORAGE_ACCOUNT != STORAGE_ACCOUNT")
holder_table[holder_account] = HolderStruct(storage_account)
else:
holder_table[holder_account] = HolderStruct(storage_account)
Expand Down Expand Up @@ -385,25 +384,25 @@ def process_receipts(self):

if holder_account in holder_table:
if holder_table[holder_account].storage_account != storage_account:
logger.error("Strange behavior. Pay attention. STORAGE_ACCOUNT != STORAGE_ACCOUNT")
self.error("Strange behavior. Pay attention. STORAGE_ACCOUNT != STORAGE_ACCOUNT")
holder_table[holder_account] = HolderStruct(storage_account)
else:
logger.error("Strange behavior. Pay attention. HOLDER ACCOUNT NOT FOUND")
self.error("Strange behavior. Pay attention. HOLDER ACCOUNT NOT FOUND")
holder_table[holder_account] = HolderStruct(storage_account)

if got_result is not None:
if continue_table[storage_account].results:
logger.error("Strange behavior. Pay attention. RESULT ALREADY EXISTS IN CONTINUE TABLE")
self.error("Strange behavior. Pay attention. RESULT ALREADY EXISTS IN CONTINUE TABLE")
if continue_table[storage_account].accounts != blocked_accounts:
logger.error("Strange behavior. Pay attention. BLOCKED ACCOUNTS NOT EQUAL")
self.error("Strange behavior. Pay attention. BLOCKED ACCOUNTS NOT EQUAL")

continue_table[storage_account].results = got_result
else:
continue_table[storage_account] = ContinueStruct(signature, got_result, slot, blocked_accounts)
holder_table[holder_account] = HolderStruct(storage_account)

if instruction_data[0] > 0x16:
logger.debug("{:>10} {:>6} Unknown 0x{}".format(slot, counter, instruction_data.hex()))
self.debug("{:>10} {:>6} Unknown 0x{}".format(slot, counter, instruction_data.hex()))

pass

Expand All @@ -413,12 +412,12 @@ def process_receipts(self):
elif trx_struct.storage is not None:
if not self.submit_transaction_part(trx_struct):
if abs(trx_struct.slot - self.current_slot) > CANCEL_TIMEOUT:
logger.debug("Probably blocked")
logger.debug(trx_struct.eth_signature)
logger.debug(trx_struct.signatures)
self.debug("Probably blocked")
self.debug(trx_struct.eth_signature)
self.debug(trx_struct.signatures)
self.blocked_storages[trx_struct.storage] = (trx_struct.eth_trx, trx_struct.blocked_accounts)
else:
logger.error(trx_struct)
self.error(trx_struct)


def submit_transaction(self, trx_struct):
Expand Down Expand Up @@ -446,7 +445,7 @@ def submit_transaction(self, trx_struct):
}
self.blocks_by_hash[block_hash] = slot

logger.debug(trx_struct.eth_signature + " " + status)
self.debug(trx_struct.eth_signature + " " + status)


def submit_transaction_part(self, trx_struct):
Expand Down Expand Up @@ -493,7 +492,7 @@ def get_block(self, slot):
block_hash = '0x' + base58.b58decode(block['blockhash']).hex()
retry = False
except Exception as err:
logger.debug(err)
self.debug(err)
time.sleep(1)

return (slot, block_hash)
Expand All @@ -502,12 +501,7 @@ def get_block(self, slot):
def run_indexer(solana_url,
evm_loader_id,
log_level = 'DEBUG'):
logging.basicConfig(format='%(asctime)s - pid:%(process)d [%(levelname)-.1s] %(funcName)s:%(lineno)d - %(message)s')
logger.setLevel(logging.DEBUG)
logger.info(f"""Running indexer with params:
solana_url: {solana_url},
evm_loader_id: {evm_loader_id},
log_level: {log_level}""")


indexer = Indexer(solana_url,
evm_loader_id,
Expand Down
Loading