Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
8d079ed
introduce logged_based
Dec 27, 2021
4b609b1
Get used with logged_group
Dec 29, 2021
a63d17d
Provide logged group on the rest of code base
Dec 29, 2021
4c3ffad
Merge remote-tracking branch 'origin/develop' into 422_introduce_logg…
Dec 29, 2021
b4eda35
Add Airdropper logged_group
Dec 29, 2021
8275f07
Spit and polish
Dec 29, 2021
b3ecaae
Spit and polish
Dec 29, 2021
c8d51ff
Spit and polish
Dec 29, 2021
e5854f4
Spit and polish
Dec 29, 2021
1c36d89
Spit and polish
Dec 29, 2021
bea6cfa
Spit and polish
Dec 29, 2021
465754e
Spit and polish
Dec 29, 2021
f7a055a
rollback proxy.py
Dec 29, 2021
c345177
spit and polish
Dec 29, 2021
ab79eb0
Remove log_level param from airdropper
Dec 29, 2021
76f914e
Spit and polish
Dec 29, 2021
c5cbdba
Fix indexer base init error
Dec 29, 2021
ee87cb7
Fix indexer base init error
Dec 29, 2021
1218d78
Reduce Airdropper level
Dec 30, 2021
523e44f
roll back get_measurements prefix
Dec 30, 2021
dcdf481
Get logged-groups package from different place
Dec 30, 2021
13242b0
install git to resolve python dependency
Dec 30, 2021
b32b936
Spit on Dockerfile and polish
Dec 30, 2021
35a51b7
Fix get_measurements errors parsig command
Dec 30, 2021
5372136
Merge remote-tracking branch 'origin/develop' into 422_introduce_logg…
Dec 30, 2021
125c56e
Polish after merge
Dec 30, 2021
3f759ac
Fix
Dec 30, 2021
3862077
fix
Dec 30, 2021
a7b1de2
fix
Dec 30, 2021
ed83fe6
Merge branch 'develop' into 422_introduce_logged_groups
Jan 10, 2022
440ba11
fix
Jan 10, 2022
7989afb
fix
Jan 10, 2022
ccdd299
spit and polish
Jan 10, 2022
0b81aca
fix errors
Jan 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Provide logged group on the rest of code base
  • Loading branch information
rozhkovdmitrii committed Dec 29, 2021
commit a63d17d486635b089db80aae4f917a3b51248156
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
!requirements.txt
!setup.py
!README.md
!log_cfg.json

# Ignore __pycache__ directory
proxy/__pycache__
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ RUN apt update && \

COPY ./requirements.txt /opt
COPY ./proxy/solana-py.patch /opt
COPY ./log_cfg.json /opt

WORKDIR /opt

RUN python3 -m venv venv && \
Expand Down
2 changes: 1 addition & 1 deletion log_cfg.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"format": "%(asctime)23s %(levelname)8s %(process)6d:%(threadName)-10s %(class)20s:%(class_id)-8s %(message)s",
"colored": true,
"colored": false,
"logged_groups": {
"Indexer": "CRITICAL",
"Proxy": "DEBUG"
Expand Down
28 changes: 15 additions & 13 deletions proxy/common_neon/solana_interactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
from solana.rpc.types import TxOpts
from solana.transaction import Transaction

from logged_groups import logged_group


from .costs import update_transaction_cost
from .utils import get_from_dict
from ..environment import EVM_LOADER_ID, CONFIRMATION_CHECK_DELAY, LOG_SENDING_SOLANA_TRANSACTION, RETRY_ON_FAIL

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


@logged_group("Proxy")
class SolanaInteractor:
def __init__(self, signer, client: SolanaClient) -> None:
self.signer = signer
Expand All @@ -41,11 +42,11 @@ def get_account_info(self, storage_account):
}

result = self.client._provider.make_request("getAccountInfo", str(storage_account), opts)
logger.debug("\n{}".format(json.dumps(result, indent=4, sort_keys=True)))
self.debug("\n{}".format(json.dumps(result, indent=4, sort_keys=True)))

info = result['result']['value']
if info is None:
logger.debug("Can't get information about {}".format(storage_account))
self.debug("Can't get information about {}".format(storage_account))
return None

data = base64.b64decode(info['data'][0])
Expand Down Expand Up @@ -103,7 +104,7 @@ def send_transaction_unconfirmed(self, txn: Transaction):
except SendTransactionError as err:
err_type = get_from_dict(err.result, "data", "err")
if err_type is not None and isinstance(err_type, str) and err_type == "BlockhashNotFound":
logger.debug("BlockhashNotFound {}".format(blockhash))
self.debug("BlockhashNotFound {}".format(blockhash))
time.sleep(0.1)
continue
raise
Expand All @@ -119,7 +120,7 @@ def collect_result(self, reciept, eth_trx, reason=None):

def send_measured_transaction(self, trx, eth_trx, reason):
if LOG_SENDING_SOLANA_TRANSACTION:
logger.debug("send_measured_transaction for reason %s: %s ", reason, trx.__dict__)
self.debug("send_measured_transaction for reason %s: %s ", reason, trx.__dict__)
result = self.send_transaction(trx, eth_trx, reason=reason)
self.get_measurements(result)
return result
Expand All @@ -129,20 +130,20 @@ def send_measured_transaction(self, trx, eth_trx, reason):
def get_measurements(self, result):
try:
measurements = self.extract_measurements_from_receipt(result)
for m in measurements: logger.info(json.dumps(m))
for m in measurements: self.info(json.dumps(m))
except Exception as err:
logger.error("Can't get measurements %s"%err)
logger.info("Failed result: %s"%json.dumps(result, indent=3))
self.error("Can't get measurements %s"%err)
self.info("Failed result: %s"%json.dumps(result, indent=3))


def confirm_transaction(self, tx_sig, confirmations=0):
"""Confirm a transaction."""
TIMEOUT = 30 # 30 seconds pylint: disable=invalid-name
elapsed_time = 0
while elapsed_time < TIMEOUT:
logger.debug('confirm_transaction for %s', tx_sig)
self.debug('confirm_transaction for %s', tx_sig)
resp = self.client.get_signature_statuses([tx_sig])
logger.debug('confirm_transaction: %s', resp)
self.debug('confirm_transaction: %s', resp)
if resp["result"]:
status = resp['result']['value'][0]
if status and (status['confirmationStatus'] == 'finalized' or \
Expand All @@ -160,7 +161,8 @@ def collect_results(self, receipts, eth_trx=None, reason=None):
return results

@staticmethod
def extract_measurements_from_receipt(receipt):
@logged_group("Proxy")
def extract_measurements_from_receipt(receipt, *, logger):
if check_for_errors(receipt):
logger.warning("Can't get measurements from receipt with error")
logger.info("Failed result: %s"%json.dumps(receipt, indent=3))
Expand Down
12 changes: 6 additions & 6 deletions proxy/indexer/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
from multiprocessing.dummy import Pool as ThreadPool
from logged_groups import logged_group

# try:
# from utils import check_error, get_trx_results, get_trx_receipts, LogDB, Canceller
# from sql_dict import SQLDict
# except ImportError:
from .utils import check_error, get_trx_results, get_trx_receipts, LogDB, Canceller
from .sql_dict import SQLDict
try:
from utils import check_error, get_trx_results, get_trx_receipts, LogDB, Canceller
from sql_dict import SQLDict
except ImportError:
from .utils import check_error, get_trx_results, get_trx_receipts, LogDB, Canceller
from .sql_dict import SQLDict

CANCEL_TIMEOUT = int(os.environ.get("CANCEL_TIMEOUT", "60"))
UPDATE_BLOCK_COUNT = PARALLEL_REQUESTS * 16
Expand Down
17 changes: 8 additions & 9 deletions proxy/indexer/price_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
import base58
import struct
from datetime import datetime
from logging import Logger

logger = Logger(__name__)
from logged_groups import logged_group

field_info = {
'expo': { 'pos': 20, 'len': 4, 'format': '<i' },
Expand Down Expand Up @@ -41,6 +39,7 @@ def _unpack_field(raw_data: bytes, field_name: str):
PRICE_STATUS_TRADING = 1


@logged_group("Indexer")
class PriceProvider:
def __init__(self, solana_url: str, default_upd_int: int, price_accounts=None):
self.client = Client(solana_url)
Expand All @@ -59,23 +58,23 @@ def _get_current_time(self):
def _read_price(self, pairname):
acc_id = self.price_accounts.get(pairname, None)
if acc_id is None:
logger.warning(f'No account found for pair {pairname}')
self.warning(f'No account found for pair {pairname}')
return None

response = self.client.get_account_info(PublicKey(acc_id))
result = response.get('result', None)
if result is None:
logger.warning(f'Failed to read account data for account {acc_id}')
self.warning(f'Failed to read account data for account {acc_id}')
return None

value = result.get('value', None)
if value is None:
logger.warning(f'Failed to read account data for account {acc_id}')
self.warning(f'Failed to read account data for account {acc_id}')
return None

data = value.get('data', None)
if not isinstance(data, list) or len(data) != 2:
logger.warning(f'Failed to read account data for account {acc_id}')
self.warning(f'Failed to read account data for account {acc_id}')
return None

encoding = data[1]
Expand All @@ -84,12 +83,12 @@ def _read_price(self, pairname):
elif encoding == 'base64':
data = base64.b64decode(data[0])
else:
logger.warning(f'Unknown encoding {encoding}')
self.warning(f'Unknown encoding {encoding}')
return None

status = _unpack_field(data, 'agg.status')
if status != PRICE_STATUS_TRADING: # not Trading
logger.warning(f'Price status is {status}')
self.warning(f'Price status is {status}')
return None

expo = _unpack_field(data, 'expo')
Expand Down
5 changes: 0 additions & 5 deletions proxy/indexer/sql_dict.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import psycopg2
import os
import logging
from collections.abc import MutableMapping

POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db")
Expand All @@ -14,10 +13,6 @@
from pickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL


logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


def encode(obj):
"""Serialize an object using pickle to a binary format accepted by SQLite."""
return psycopg2.Binary(dumps(obj, protocol=PICKLE_PROTOCOL))
Expand Down
41 changes: 19 additions & 22 deletions proxy/indexer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@
from spl.token.constants import TOKEN_PROGRAM_ID
from spl.token.instructions import get_associated_token_address
from web3.auto.gethdev import w3
from logged_groups import logged_group

from ..common_neon.constants import SYSVAR_INSTRUCTION_PUBKEY, INCINERATOR_PUBKEY, KECCAK_PROGRAM
from ..common_neon.layouts import STORAGE_ACCOUNT_INFO_LAYOUT
from ..environment import SOLANA_URL, EVM_LOADER_ID, ETH_TOKEN_MINT_ID


logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


def check_error(trx):
if 'meta' in trx and 'err' in trx['meta'] and trx['meta']['err'] is not None:
# logger.debug("Got err trx")
Expand Down Expand Up @@ -114,7 +111,8 @@ def get_trx_receipts(unsigned_msg, signature):
return (trx_raw.hex(), eth_signature, from_address)


def get_account_list(client, storage_account):
@logged_group("Indexer")
def get_account_list(client, storage_account, *, logger):
opts = {
"encoding": "base64",
"commitment": "confirmed",
Expand Down Expand Up @@ -153,8 +151,7 @@ def get_account_list(client, storage_account):
return None




@logged_group("Indexer")
class LogDB:
def __init__(self):
POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db")
Expand Down Expand Up @@ -202,12 +199,12 @@ def push_logs(self, logs):
)
)
if len(rows):
# logger.debug(rows)
# self.debug(rows)
cur = self.conn.cursor()
cur.executemany('INSERT INTO logs VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING', rows)
self.conn.commit()
else:
logger.debug("NO LOGS")
self.debug("NO LOGS")


def get_logs(self, fromBlock = None, toBlock = None, address = None, topics = None, blockHash = None):
Expand Down Expand Up @@ -254,8 +251,8 @@ def get_logs(self, fromBlock = None, toBlock = None, address = None, topics = No
if idx < len(queries) - 1:
query_string += " AND "

logger.debug(query_string)
logger.debug(params)
self.debug(query_string)
self.debug(params)

cur = self.conn.cursor()
cur.execute(query_string, tuple(params))
Expand Down Expand Up @@ -304,10 +301,10 @@ def call(self, *args):
cmd = ["solana",
"--url", SOLANA_URL,
] + list(args)
logger.debug(cmd)
self.debug(cmd)
return subprocess.check_output(cmd, universal_newlines=True)
except subprocess.CalledProcessError as err:
logger.debug("ERR: solana error {}".format(err))
self.debug("ERR: solana error {}".format(err))
raise


Expand All @@ -327,19 +324,19 @@ def unlock_accounts(self, blocked_storages):
(eth_trx, blocked_accs) = trx_accs
acc_list = get_account_list(self.client, storage)
if eth_trx is None:
logger.error("trx is None")
self.error("trx is None")
continue
if blocked_accs is None:
logger.error("blocked_accs is None")
self.error("blocked_accs is None")
continue
if acc_list is None:
logger.error("acc_list is None. Storage is empty")
logger.error(storage)
self.error("acc_list is None. Storage is empty")
self.error(storage)
continue

eth_trx = rlp.decode(bytes.fromhex(eth_trx), EthTrx)
if acc_list != blocked_accs:
logger.error("acc_list != blocked_accs")
self.error("acc_list != blocked_accs")
continue

if acc_list is not None:
Expand All @@ -361,11 +358,11 @@ def unlock_accounts(self, blocked_storages):
keys=keys
))

logger.debug("Send Cancel")
self.debug("Send Cancel")
try:
self.client.send_transaction(trx, self.signer, opts=TxOpts(preflight_commitment=Confirmed))
except Exception as err:
logger.error(err)
self.error(err)
else:
logger.debug("Canceled")
logger.debug(acc_list)
self.debug("Canceled")
self.debug(acc_list)
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ psycopg2-binary
ethereum
py-solc-x==1.1.0
flask
logged-groups=1.1.2
logged-groups==1.1.2