Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
e496b27
VERSION = (0, 4, 0)
vasiliy-zaznobin Nov 16, 2021
fe41d64
EVM_LOADER_REVISION:=v0.4.0
vasiliy-zaznobin Nov 16, 2021
5467c50
EVM_LOADER_REVISION:=stable
vasiliy-zaznobin Nov 16, 2021
3c6f4bc
289 implement eth get storage at (#298)
ivandzen Nov 18, 2021
6e1864e
#256 create and airdrop eth account (#259)
rozhkovdmitrii Nov 18, 2021
4880060
#305 Remove extra args from ether2program (#309)
rozhkovdmitrii Nov 19, 2021
531a9e2
Change default logDB postgress pass (#317)
rozhkovdmitrii Nov 23, 2021
6f3cf9a
#311 Create account and airdrop on gas estimation if it's preset
rozhkovdmitrii Nov 24, 2021
452a86d
neonbals/neon-evm#371 Update STORAGE_ACCOUNT_INFO_LAYOUT (#315)
sinev-valentine Nov 24, 2021
08ebfbc
319 add neon cli version handler (#325)
vasiliy-zaznobin Nov 24, 2021
fb01a4e
fix storage account check (#327)
sinev-valentine Nov 24, 2021
ed83d90
#320 add neon proxy version handler (#323)
vasiliy-zaznobin Nov 25, 2021
dea01f4
313 concurrent execution of solana program dump (#314)
ivandzen Nov 25, 2021
024bf02
fix scripts (#329)
ivandzen Nov 25, 2021
cf9112d
#318 JSON_RPC "params" field may be omitted (#322)
mich-master Nov 26, 2021
fa9e43e
#336 indexer refactoring (#340)
ivandzen Dec 2, 2021
c7ef793
#333 fix indexer errors (#334)
otselnik Dec 2, 2021
e74e568
#291 Proxy refactoring (#324)
otselnik Dec 2, 2021
d5a5015
#337 сreate base airdropper service (#343)
ivandzen Dec 3, 2021
50bc6ba
#337 fix running airdropper (#347)
ivandzen Dec 6, 2021
a619b66
#351 fix canceller droping running transactions (#352)
otselnik Dec 6, 2021
d2f8817
349 improve neon proxy logging to filter a request and the correspond…
vasiliy-zaznobin Dec 6, 2021
5d6dab2
#349 Fix using log sending solana transaction (#353)
vasiliy-zaznobin Dec 6, 2021
4045e90
#295 iterative execution (#332)
otselnik Dec 6, 2021
521696f
#354 Check result for errors (#355)
otselnik Dec 7, 2021
fff455e
#360 pass transaction too large upper (#361)
otselnik Dec 7, 2021
e15d847
Merge remote-tracking branch 'origin/develop'
vasiliy-zaznobin Dec 7, 2021
2416591
NEON_PROXY_PKG_VERSION = '0.5.0'
vasiliy-zaznobin Dec 7, 2021
168e428
VERSION = (0, 4, 0)
vasiliy-zaznobin Nov 16, 2021
3125645
EVM_LOADER_REVISION:=v0.4.0
vasiliy-zaznobin Nov 16, 2021
07f175d
EVM_LOADER_REVISION:=stable
vasiliy-zaznobin Nov 16, 2021
0bbb26b
289 implement eth get storage at (#298)
ivandzen Nov 18, 2021
7841b07
#256 create and airdrop eth account (#259)
rozhkovdmitrii Nov 18, 2021
ce05c81
#305 Remove extra args from ether2program (#309)
rozhkovdmitrii Nov 19, 2021
6ca860a
Change default logDB postgress pass (#317)
rozhkovdmitrii Nov 23, 2021
b3b322f
#311 Create account and airdrop on gas estimation if it's preset
rozhkovdmitrii Nov 24, 2021
c860ed0
neonbals/neon-evm#371 Update STORAGE_ACCOUNT_INFO_LAYOUT (#315)
sinev-valentine Nov 24, 2021
9bb3f89
319 add neon cli version handler (#325)
vasiliy-zaznobin Nov 24, 2021
081bc75
fix storage account check (#327)
sinev-valentine Nov 24, 2021
7845ca9
#320 add neon proxy version handler (#323)
vasiliy-zaznobin Nov 25, 2021
f299c57
313 concurrent execution of solana program dump (#314)
ivandzen Nov 25, 2021
0a5dc2d
fix scripts (#329)
ivandzen Nov 25, 2021
7964cad
#318 JSON_RPC "params" field may be omitted (#322)
mich-master Nov 26, 2021
01df8a4
#336 indexer refactoring (#340)
ivandzen Dec 2, 2021
d9a8e51
#333 fix indexer errors (#334)
otselnik Dec 2, 2021
f4b178c
#291 Proxy refactoring (#324)
otselnik Dec 2, 2021
5ced809
#337 сreate base airdropper service (#343)
ivandzen Dec 3, 2021
ae7b392
#337 fix running airdropper (#347)
ivandzen Dec 6, 2021
9feca6a
#351 fix canceller droping running transactions (#352)
otselnik Dec 6, 2021
2f0a0df
349 improve neon proxy logging to filter a request and the correspond…
vasiliy-zaznobin Dec 6, 2021
3b36864
#349 Fix using log sending solana transaction (#353)
vasiliy-zaznobin Dec 6, 2021
6bb8509
#295 iterative execution (#332)
otselnik Dec 6, 2021
56044ea
#354 Check result for errors (#355)
otselnik Dec 7, 2021
0bad34c
#360 pass transaction too large upper (#361)
otselnik Dec 7, 2021
9e8344c
NEON_PROXY_PKG_VERSION = '0.5.0'
vasiliy-zaznobin Dec 7, 2021
981c178
Merge remote-tracking branch 'origin/develop->master_0.5.0' into deve…
vasiliy-zaznobin Dec 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
#336 indexer refactoring (#340)
* cherrypick part of changes

* create indexer.py

* remove solana_receipts_update.py

* Fix inspection issues

* fix last issue

Co-authored-by: ivanl <[email protected]>
  • Loading branch information
2 people authored and vasiliy-zaznobin committed Dec 7, 2021
commit 01df8a4c9175c1ffc592de6cd916cc9a7f8fa890
4 changes: 4 additions & 0 deletions .buildkite/steps/deploy-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ docker run --rm -ti --network=container:proxy \
-e SOLANA_URL \
-e EXTRA_GAS=100000 \
-e NEW_USER_AIRDROP_AMOUNT=100 \
-e POSTGRES_DB=neon-db \
-e POSTGRES_USER=neon-proxy \
-e POSTGRES_PASSWORD=neon-proxy-pass \
-e POSTGRES_HOST=postgres \
--entrypoint ./proxy/deploy-test.sh \
${EXTRA_ARGS:-} \
$PROXY_IMAGE \
Expand Down
2 changes: 1 addition & 1 deletion proxy/docker-compose-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ services:
POSTGRES_PASSWORD: neon-proxy-pass
hostname: postgres
healthcheck:
test: [ CMD-SHELL, "pg_isready" ]
test: [ CMD-SHELL, "pg_isready -h postgres -p 5432" ]
interval: 5s
timeout: 10s
retries: 10
Expand Down
167 changes: 37 additions & 130 deletions proxy/indexer/solana_receipts_update.py → proxy/indexer/indexer.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from proxy.indexer.indexer_base import logger, IndexerBase, PARALLEL_REQUESTS
import base58
import rlp
import json
import os
import time
import logging
from solana.rpc.api import Client
from multiprocessing.dummy import Pool as ThreadPool
from typing import Dict, Union
from proxy.environment import solana_url, evm_loader_id


try:
Expand All @@ -17,16 +15,7 @@
from .utils import check_error, get_trx_results, get_trx_receipts, LogDB, Canceller
from .sql_dict import SQLDict


PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2"))
CANCEL_TIMEOUT = int(os.environ.get("CANCEL_TIMEOUT", "60"))

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

DEVNET_HISTORY_START = "7BdwyUQ61RUZP63HABJkbW66beLk22tdXnP69KsvQBJekCPVaHoJY47Rw68b3VV1UbQNHxX3uxUSLfiJrfy2bTn"
HISTORY_START = [DEVNET_HISTORY_START]

UPDATE_BLOCK_COUNT = PARALLEL_REQUESTS * 16

class HolderStruct:
Expand Down Expand Up @@ -57,131 +46,34 @@ def __init__(self, eth_trx, eth_signature, from_address, got_result, signatures,
self.slot = slot


class Indexer:
def __init__(self):
self.client = Client(solana_url)
class Indexer(IndexerBase):
def __init__(self,
solana_url,
evm_loader_id,
log_level = 'INFO'):
IndexerBase.__init__(self, solana_url, evm_loader_id, log_level)

self.canceller = Canceller()
self.logs_db = LogDB()
self.blocks_by_hash = SQLDict(tablename="solana_blocks_by_hash")
self.transaction_receipts = SQLDict(tablename="known_transactions")
self.ethereum_trx = SQLDict(tablename="ethereum_transactions")
self.eth_sol_trx = SQLDict(tablename="ethereum_solana_transactions")
self.sol_eth_trx = SQLDict(tablename="solana_ethereum_transactions")
self.constants = SQLDict(tablename="constants")
self.last_slot = 0
self.current_slot = 0
self.transaction_order = []
if 'last_block' not in self.constants:
self.constants['last_block'] = 0
self.blocked_storages = {}
self.counter_ = 0

def run(self, loop = True):
while (True):
try:
logger.debug("Start indexing")
self.gather_unknown_transactions()
logger.debug("Process receipts")
self.process_receipts()
logger.debug("Start getting blocks")
self.gather_blocks()
logger.debug("Unlock accounts")
self.canceller.unlock_accounts(self.blocked_storages)
self.blocked_storages = {}
except Exception as err:
logger.debug("Got exception while indexing. Type(err):%s, Exception:%s", type(err), err)


def gather_unknown_transactions(self):
poll_txs = set()
ordered_txs = []

minimal_tx = None
continue_flag = True
current_slot = self.client.get_slot(commitment="confirmed")["result"]
maximum_slot = self.last_slot
minimal_slot = current_slot

percent = 0

counter = 0
while (continue_flag):
opts: Dict[str, Union[int, str]] = {}
if minimal_tx:
opts["before"] = minimal_tx
opts["commitment"] = "confirmed"
result = self.client._provider.make_request("getSignaturesForAddress", evm_loader_id, opts)
logger.debug("{:>3} get_signatures_for_address {}".format(counter, len(result["result"])))
counter += 1

if len(result["result"]) == 0:
logger.debug("len(result['result']) == 0")
break

for tx in result["result"]:
solana_signature = tx["signature"]
slot = tx["slot"]

if solana_signature in HISTORY_START:
logger.debug(solana_signature)
continue_flag = False
break

ordered_txs.append(solana_signature)

if solana_signature not in self.transaction_receipts:
poll_txs.add(solana_signature)

if slot < minimal_slot:
minimal_slot = slot
minimal_tx = solana_signature

if slot > maximum_slot:
maximum_slot = slot

if slot < self.last_slot:
continue_flag = False
break

logger.debug("start getting receipts")
pool = ThreadPool(PARALLEL_REQUESTS)
pool.map(self.get_tx_receipts, poll_txs)

if len(self.transaction_order):
index = 0
try:
index = ordered_txs.index(self.transaction_order[0])
except ValueError:
self.transaction_order = ordered_txs + self.transaction_order
else:
self.transaction_order = ordered_txs[:index] + self.transaction_order
else:
self.transaction_order = ordered_txs

self.last_slot = maximum_slot
self.current_slot = current_slot

self.counter_ = 0


def get_tx_receipts(self, solana_signature):
# trx = None
retry = True

while retry:
try:
trx = self.client.get_confirmed_transaction(solana_signature)['result']
self.transaction_receipts[solana_signature] = trx
retry = False
except Exception as err:
logger.debug(err)
time.sleep(1)

self.counter_ += 1
if self.counter_ % 100 == 0:
logger.debug(self.counter_)

# return (solana_signature, trx)
def process_functions(self):
IndexerBase.process_functions(self)
logger.debug("Process receipts")
self.process_receipts()
logger.debug("Start getting blocks")
self.gather_blocks()
logger.debug("Unlock accounts")
self.canceller.unlock_accounts(self.blocked_storages)
self.blocked_storages = {}


def process_receipts(self):
Expand Down Expand Up @@ -209,7 +101,7 @@ def process_receipts(self):
if trx['transaction']['message']['instructions'] is not None:
for instruction in trx['transaction']['message']['instructions']:

if trx["transaction"]["message"]["accountKeys"][instruction["programIdIndex"]] != evm_loader_id:
if trx["transaction"]["message"]["accountKeys"][instruction["programIdIndex"]] != self.evm_loader_id:
continue

if check_error(trx):
Expand Down Expand Up @@ -574,12 +466,27 @@ def get_block(self, slot):
return (slot, block_hash)


def run_indexer():
def run_indexer(solana_url,
evm_loader_id,
log_level = 'DEBUG'):
logging.basicConfig(format='%(asctime)s - pid:%(process)d [%(levelname)-.1s] %(funcName)s:%(lineno)d - %(message)s')
logger.setLevel(logging.DEBUG)
indexer = Indexer()
indexer.run(False)
logger.info(f"""Running indexer with params:
solana_url: {solana_url},
evm_loader_id: {evm_loader_id},
log_level: {log_level}""")

indexer = Indexer(solana_url,
evm_loader_id,
log_level)
indexer.run()


if __name__ == "__main__":
run_indexer()
solana_url = os.environ.get('SOLANA_URL', 'http://localhost:8899')
evm_loader_id = os.environ.get('EVM_LOADER_ID', '53DfF883gyixYNXnM7s5xhdeyV8mVk9T4i2hGV9vG9io')
log_level = os.environ.get('LOG_LEVEL', 'INFO')

run_indexer(solana_url,
evm_loader_id,
log_level)
149 changes: 149 additions & 0 deletions proxy/indexer/indexer_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import os
import time
import logging
from solana.rpc.api import Client
from multiprocessing.dummy import Pool as ThreadPool
from typing import Dict, Union

try:
from sql_dict import SQLDict
except ImportError:
from .sql_dict import SQLDict


PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2"))

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

DEVNET_HISTORY_START = "7BdwyUQ61RUZP63HABJkbW66beLk22tdXnP69KsvQBJekCPVaHoJY47Rw68b3VV1UbQNHxX3uxUSLfiJrfy2bTn"
HISTORY_START = [DEVNET_HISTORY_START]


log_levels = {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARN': logging.WARN,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'FATAL': logging.FATAL,
'CRITICAL': logging.CRITICAL
}

class IndexerBase:
def __init__(self,
solana_url,
evm_loader_id,
log_level):
logger.setLevel(log_levels.get(log_level, logging.INFO))

self.evm_loader_id = evm_loader_id
self.client = Client(solana_url)
self.transaction_receipts = SQLDict(tablename="known_transactions")
self.last_slot = 0
self.current_slot = 0
self.transaction_order = []
self.counter_ = 0


def run(self):
while (True):
try:
self.process_functions()
except Exception as err:
logger.warning("Got exception while indexing. Type(err):%s, Exception:%s", type(err), err)


def process_functions(self):
logger.debug("Start indexing")
self.gather_unknown_transactions()


def gather_unknown_transactions(self):
poll_txs = set()
ordered_txs = []

minimal_tx = None
continue_flag = True
current_slot = self.client.get_slot(commitment="confirmed")["result"]
maximum_slot = self.last_slot
minimal_slot = current_slot

counter = 0
while (continue_flag):
opts: Dict[str, Union[int, str]] = {}
if minimal_tx:
opts["before"] = minimal_tx
opts["commitment"] = "confirmed"
result = self.client._provider.make_request("getSignaturesForAddress", self.evm_loader_id, opts)
logger.debug("{:>3} get_signatures_for_address {}".format(counter, len(result["result"])))
counter += 1

if len(result["result"]) == 0:
logger.debug("len(result['result']) == 0")
break

for tx in result["result"]:
solana_signature = tx["signature"]
slot = tx["slot"]

if solana_signature in HISTORY_START:
logger.debug(solana_signature)
continue_flag = False
break

ordered_txs.append(solana_signature)

if solana_signature not in self.transaction_receipts:
poll_txs.add(solana_signature)

if slot < minimal_slot:
minimal_slot = slot
minimal_tx = solana_signature

if slot > maximum_slot:
maximum_slot = slot

if slot < self.last_slot:
continue_flag = False
break

logger.debug("start getting receipts")
pool = ThreadPool(PARALLEL_REQUESTS)
pool.map(self.get_tx_receipts, poll_txs)

if len(self.transaction_order):
index = 0
try:
index = ordered_txs.index(self.transaction_order[0])
except ValueError:
self.transaction_order = ordered_txs + self.transaction_order
else:
self.transaction_order = ordered_txs[:index] + self.transaction_order
else:
self.transaction_order = ordered_txs

self.last_slot = maximum_slot
self.current_slot = current_slot

self.counter_ = 0


def get_tx_receipts(self, solana_signature):
# trx = None
retry = True

while retry:
try:
trx = self.client.get_confirmed_transaction(solana_signature)['result']
self.transaction_receipts[solana_signature] = trx
retry = False
except Exception as err:
logger.debug(err)
time.sleep(1)

self.counter_ += 1
if self.counter_ % 100 == 0:
logger.debug(self.counter_)

# return (solana_signature, trx)
Loading