Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
5a3e9a6
Use neonlabsorg/evm_loader:stable image for build
Oct 12, 2021
e59efe2
Merge remote-tracking branch 'origin/develop'
Oct 19, 2021
44dd154
Merge remote-tracking branch 'origin/develop'
Oct 21, 2021
21d20a1
Launch all uniswap tests for master branch
vasiliy-zaznobin Nov 16, 2021
4e03e73
Merge remote-tracking branch 'origin/develop'
Nov 16, 2021
e496b27
VERSION = (0, 4, 0)
vasiliy-zaznobin Nov 16, 2021
fe41d64
EVM_LOADER_REVISION:=v0.4.0
vasiliy-zaznobin Nov 16, 2021
5467c50
EVM_LOADER_REVISION:=stable
vasiliy-zaznobin Nov 16, 2021
e15d847
Merge remote-tracking branch 'origin/develop'
vasiliy-zaznobin Dec 7, 2021
2416591
NEON_PROXY_PKG_VERSION = '0.5.0'
vasiliy-zaznobin Dec 7, 2021
168e428
VERSION = (0, 4, 0)
vasiliy-zaznobin Nov 16, 2021
3125645
EVM_LOADER_REVISION:=v0.4.0
vasiliy-zaznobin Nov 16, 2021
07f175d
EVM_LOADER_REVISION:=stable
vasiliy-zaznobin Nov 16, 2021
0bbb26b
289 implement eth get storage at (#298)
ivandzen Nov 18, 2021
7841b07
#256 create and airdrop eth account (#259)
rozhkovdmitrii Nov 18, 2021
ce05c81
#305 Remove extra args from ether2program (#309)
rozhkovdmitrii Nov 19, 2021
6ca860a
Change default logDB postgress pass (#317)
rozhkovdmitrii Nov 23, 2021
b3b322f
#311 Create account and airdrop on gas estimation if it's preset
rozhkovdmitrii Nov 24, 2021
c860ed0
neonbals/neon-evm#371 Update STORAGE_ACCOUNT_INFO_LAYOUT (#315)
sinev-valentine Nov 24, 2021
9bb3f89
319 add neon cli version handler (#325)
vasiliy-zaznobin Nov 24, 2021
081bc75
fix storage account check (#327)
sinev-valentine Nov 24, 2021
7845ca9
#320 add neon proxy version handler (#323)
vasiliy-zaznobin Nov 25, 2021
f299c57
313 concurrent execution of solana program dump (#314)
ivandzen Nov 25, 2021
0a5dc2d
fix scripts (#329)
ivandzen Nov 25, 2021
7964cad
#318 JSON_RPC "params" field may be omitted (#322)
mich-master Nov 26, 2021
01df8a4
#336 indexer refactoring (#340)
ivandzen Dec 2, 2021
d9a8e51
#333 fix indexer errors (#334)
otselnik Dec 2, 2021
f4b178c
#291 Proxy refactoring (#324)
otselnik Dec 2, 2021
5ced809
#337 сreate base airdropper service (#343)
ivandzen Dec 3, 2021
ae7b392
#337 fix running airdropper (#347)
ivandzen Dec 6, 2021
9feca6a
#351 fix canceller droping running transactions (#352)
otselnik Dec 6, 2021
2f0a0df
349 improve neon proxy logging to filter a request and the correspond…
vasiliy-zaznobin Dec 6, 2021
3b36864
#349 Fix using log sending solana transaction (#353)
vasiliy-zaznobin Dec 6, 2021
6bb8509
#295 iterative execution (#332)
otselnik Dec 6, 2021
56044ea
#354 Check result for errors (#355)
otselnik Dec 7, 2021
0bad34c
#360 pass transaction too large upper (#361)
otselnik Dec 7, 2021
9e8344c
NEON_PROXY_PKG_VERSION = '0.5.0'
vasiliy-zaznobin Dec 7, 2021
981c178
Merge remote-tracking branch 'origin/develop->master_0.5.0' into deve…
vasiliy-zaznobin Dec 7, 2021
4c87c95
Merge pull request #362 from neonlabsorg/develop->master_0.5.0
vasiliy-zaznobin Dec 7, 2021
2ccfcce
Merge remote-tracking branch 'origin/develop' into develop->master_0.…
vasiliy-zaznobin Dec 8, 2021
28881d2
Merge pull request #363 from neonlabsorg/develop->master_0.5.0-rc2
vasiliy-zaznobin Dec 8, 2021
3771577
Merge remote-tracking branch 'origin/develop' into develop->master_0.…
vasiliy-zaznobin Dec 13, 2021
1e4c4ab
Merge pull request #372 from neonlabsorg/develop->master_0.5.0_rc3
vasiliy-zaznobin Dec 13, 2021
cacf6f8
Master 0.5.0 rc4 (#380)
vasiliy-zaznobin Dec 16, 2021
5ce4d3d
0.5.0-rc4 (#386)
vasiliy-zaznobin Dec 16, 2021
6a3dbd9
Neon-proxy/v0.5.0 (#388)
vasiliy-zaznobin Dec 17, 2021
424cf24
Merge remote-tracking branch 'origin/master' into master_v0.5.0->develop
vasiliy-zaznobin Dec 17, 2021
5aef94b
Neon-proxy/v0.5.1-dev
vasiliy-zaznobin Dec 17, 2021
03c051b
stable->latest
vasiliy-zaznobin Dec 17, 2021
3e43911
VERSION = (0, 5, 1)
vasiliy-zaznobin Dec 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
#336 indexer refactoring (#340)
* cherrypick part of changes

* create indexer.py

* remove solana_receipts_update.py

* Fix inspection issues

* fix last issue

Co-authored-by: ivanl <[email protected]>
  • Loading branch information
2 people authored and vasiliy-zaznobin committed Dec 7, 2021
commit 01df8a4c9175c1ffc592de6cd916cc9a7f8fa890
4 changes: 4 additions & 0 deletions .buildkite/steps/deploy-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ docker run --rm -ti --network=container:proxy \
-e SOLANA_URL \
-e EXTRA_GAS=100000 \
-e NEW_USER_AIRDROP_AMOUNT=100 \
-e POSTGRES_DB=neon-db \
-e POSTGRES_USER=neon-proxy \
-e POSTGRES_PASSWORD=neon-proxy-pass \
-e POSTGRES_HOST=postgres \
--entrypoint ./proxy/deploy-test.sh \
${EXTRA_ARGS:-} \
$PROXY_IMAGE \
Expand Down
2 changes: 1 addition & 1 deletion proxy/docker-compose-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ services:
POSTGRES_PASSWORD: neon-proxy-pass
hostname: postgres
healthcheck:
test: [ CMD-SHELL, "pg_isready" ]
test: [ CMD-SHELL, "pg_isready -h postgres -p 5432" ]
interval: 5s
timeout: 10s
retries: 10
Expand Down
167 changes: 37 additions & 130 deletions proxy/indexer/solana_receipts_update.py → proxy/indexer/indexer.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from proxy.indexer.indexer_base import logger, IndexerBase, PARALLEL_REQUESTS
import base58
import rlp
import json
import os
import time
import logging
from solana.rpc.api import Client
from multiprocessing.dummy import Pool as ThreadPool
from typing import Dict, Union
from proxy.environment import solana_url, evm_loader_id


try:
Expand All @@ -17,16 +15,7 @@
from .utils import check_error, get_trx_results, get_trx_receipts, LogDB, Canceller
from .sql_dict import SQLDict


PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2"))
CANCEL_TIMEOUT = int(os.environ.get("CANCEL_TIMEOUT", "60"))

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

DEVNET_HISTORY_START = "7BdwyUQ61RUZP63HABJkbW66beLk22tdXnP69KsvQBJekCPVaHoJY47Rw68b3VV1UbQNHxX3uxUSLfiJrfy2bTn"
HISTORY_START = [DEVNET_HISTORY_START]

UPDATE_BLOCK_COUNT = PARALLEL_REQUESTS * 16

class HolderStruct:
Expand Down Expand Up @@ -57,131 +46,34 @@ def __init__(self, eth_trx, eth_signature, from_address, got_result, signatures,
self.slot = slot


class Indexer:
def __init__(self):
self.client = Client(solana_url)
class Indexer(IndexerBase):
def __init__(self,
solana_url,
evm_loader_id,
log_level = 'INFO'):
IndexerBase.__init__(self, solana_url, evm_loader_id, log_level)

self.canceller = Canceller()
self.logs_db = LogDB()
self.blocks_by_hash = SQLDict(tablename="solana_blocks_by_hash")
self.transaction_receipts = SQLDict(tablename="known_transactions")
self.ethereum_trx = SQLDict(tablename="ethereum_transactions")
self.eth_sol_trx = SQLDict(tablename="ethereum_solana_transactions")
self.sol_eth_trx = SQLDict(tablename="solana_ethereum_transactions")
self.constants = SQLDict(tablename="constants")
self.last_slot = 0
self.current_slot = 0
self.transaction_order = []
if 'last_block' not in self.constants:
self.constants['last_block'] = 0
self.blocked_storages = {}
self.counter_ = 0

def run(self, loop = True):
while (True):
try:
logger.debug("Start indexing")
self.gather_unknown_transactions()
logger.debug("Process receipts")
self.process_receipts()
logger.debug("Start getting blocks")
self.gather_blocks()
logger.debug("Unlock accounts")
self.canceller.unlock_accounts(self.blocked_storages)
self.blocked_storages = {}
except Exception as err:
logger.debug("Got exception while indexing. Type(err):%s, Exception:%s", type(err), err)


def gather_unknown_transactions(self):
poll_txs = set()
ordered_txs = []

minimal_tx = None
continue_flag = True
current_slot = self.client.get_slot(commitment="confirmed")["result"]
maximum_slot = self.last_slot
minimal_slot = current_slot

percent = 0

counter = 0
while (continue_flag):
opts: Dict[str, Union[int, str]] = {}
if minimal_tx:
opts["before"] = minimal_tx
opts["commitment"] = "confirmed"
result = self.client._provider.make_request("getSignaturesForAddress", evm_loader_id, opts)
logger.debug("{:>3} get_signatures_for_address {}".format(counter, len(result["result"])))
counter += 1

if len(result["result"]) == 0:
logger.debug("len(result['result']) == 0")
break

for tx in result["result"]:
solana_signature = tx["signature"]
slot = tx["slot"]

if solana_signature in HISTORY_START:
logger.debug(solana_signature)
continue_flag = False
break

ordered_txs.append(solana_signature)

if solana_signature not in self.transaction_receipts:
poll_txs.add(solana_signature)

if slot < minimal_slot:
minimal_slot = slot
minimal_tx = solana_signature

if slot > maximum_slot:
maximum_slot = slot

if slot < self.last_slot:
continue_flag = False
break

logger.debug("start getting receipts")
pool = ThreadPool(PARALLEL_REQUESTS)
pool.map(self.get_tx_receipts, poll_txs)

if len(self.transaction_order):
index = 0
try:
index = ordered_txs.index(self.transaction_order[0])
except ValueError:
self.transaction_order = ordered_txs + self.transaction_order
else:
self.transaction_order = ordered_txs[:index] + self.transaction_order
else:
self.transaction_order = ordered_txs

self.last_slot = maximum_slot
self.current_slot = current_slot

self.counter_ = 0


def get_tx_receipts(self, solana_signature):
# trx = None
retry = True

while retry:
try:
trx = self.client.get_confirmed_transaction(solana_signature)['result']
self.transaction_receipts[solana_signature] = trx
retry = False
except Exception as err:
logger.debug(err)
time.sleep(1)

self.counter_ += 1
if self.counter_ % 100 == 0:
logger.debug(self.counter_)

# return (solana_signature, trx)
def process_functions(self):
IndexerBase.process_functions(self)
logger.debug("Process receipts")
self.process_receipts()
logger.debug("Start getting blocks")
self.gather_blocks()
logger.debug("Unlock accounts")
self.canceller.unlock_accounts(self.blocked_storages)
self.blocked_storages = {}


def process_receipts(self):
Expand Down Expand Up @@ -209,7 +101,7 @@ def process_receipts(self):
if trx['transaction']['message']['instructions'] is not None:
for instruction in trx['transaction']['message']['instructions']:

if trx["transaction"]["message"]["accountKeys"][instruction["programIdIndex"]] != evm_loader_id:
if trx["transaction"]["message"]["accountKeys"][instruction["programIdIndex"]] != self.evm_loader_id:
continue

if check_error(trx):
Expand Down Expand Up @@ -574,12 +466,27 @@ def get_block(self, slot):
return (slot, block_hash)


def run_indexer():
def run_indexer(solana_url,
evm_loader_id,
log_level = 'DEBUG'):
logging.basicConfig(format='%(asctime)s - pid:%(process)d [%(levelname)-.1s] %(funcName)s:%(lineno)d - %(message)s')
logger.setLevel(logging.DEBUG)
indexer = Indexer()
indexer.run(False)
logger.info(f"""Running indexer with params:
solana_url: {solana_url},
evm_loader_id: {evm_loader_id},
log_level: {log_level}""")

indexer = Indexer(solana_url,
evm_loader_id,
log_level)
indexer.run()


if __name__ == "__main__":
run_indexer()
solana_url = os.environ.get('SOLANA_URL', 'http://localhost:8899')
evm_loader_id = os.environ.get('EVM_LOADER_ID', '53DfF883gyixYNXnM7s5xhdeyV8mVk9T4i2hGV9vG9io')
log_level = os.environ.get('LOG_LEVEL', 'INFO')

run_indexer(solana_url,
evm_loader_id,
log_level)
149 changes: 149 additions & 0 deletions proxy/indexer/indexer_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import os
import time
import logging
from solana.rpc.api import Client
from multiprocessing.dummy import Pool as ThreadPool
from typing import Dict, Union

try:
from sql_dict import SQLDict
except ImportError:
from .sql_dict import SQLDict


PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2"))

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

DEVNET_HISTORY_START = "7BdwyUQ61RUZP63HABJkbW66beLk22tdXnP69KsvQBJekCPVaHoJY47Rw68b3VV1UbQNHxX3uxUSLfiJrfy2bTn"
HISTORY_START = [DEVNET_HISTORY_START]


log_levels = {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARN': logging.WARN,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'FATAL': logging.FATAL,
'CRITICAL': logging.CRITICAL
}

class IndexerBase:
def __init__(self,
solana_url,
evm_loader_id,
log_level):
logger.setLevel(log_levels.get(log_level, logging.INFO))

self.evm_loader_id = evm_loader_id
self.client = Client(solana_url)
self.transaction_receipts = SQLDict(tablename="known_transactions")
self.last_slot = 0
self.current_slot = 0
self.transaction_order = []
self.counter_ = 0


def run(self):
while (True):
try:
self.process_functions()
except Exception as err:
logger.warning("Got exception while indexing. Type(err):%s, Exception:%s", type(err), err)


def process_functions(self):
logger.debug("Start indexing")
self.gather_unknown_transactions()


def gather_unknown_transactions(self):
poll_txs = set()
ordered_txs = []

minimal_tx = None
continue_flag = True
current_slot = self.client.get_slot(commitment="confirmed")["result"]
maximum_slot = self.last_slot
minimal_slot = current_slot

counter = 0
while (continue_flag):
opts: Dict[str, Union[int, str]] = {}
if minimal_tx:
opts["before"] = minimal_tx
opts["commitment"] = "confirmed"
result = self.client._provider.make_request("getSignaturesForAddress", self.evm_loader_id, opts)
logger.debug("{:>3} get_signatures_for_address {}".format(counter, len(result["result"])))
counter += 1

if len(result["result"]) == 0:
logger.debug("len(result['result']) == 0")
break

for tx in result["result"]:
solana_signature = tx["signature"]
slot = tx["slot"]

if solana_signature in HISTORY_START:
logger.debug(solana_signature)
continue_flag = False
break

ordered_txs.append(solana_signature)

if solana_signature not in self.transaction_receipts:
poll_txs.add(solana_signature)

if slot < minimal_slot:
minimal_slot = slot
minimal_tx = solana_signature

if slot > maximum_slot:
maximum_slot = slot

if slot < self.last_slot:
continue_flag = False
break

logger.debug("start getting receipts")
pool = ThreadPool(PARALLEL_REQUESTS)
pool.map(self.get_tx_receipts, poll_txs)

if len(self.transaction_order):
index = 0
try:
index = ordered_txs.index(self.transaction_order[0])
except ValueError:
self.transaction_order = ordered_txs + self.transaction_order
else:
self.transaction_order = ordered_txs[:index] + self.transaction_order
else:
self.transaction_order = ordered_txs

self.last_slot = maximum_slot
self.current_slot = current_slot

self.counter_ = 0


def get_tx_receipts(self, solana_signature):
# trx = None
retry = True

while retry:
try:
trx = self.client.get_confirmed_transaction(solana_signature)['result']
self.transaction_receipts[solana_signature] = trx
retry = False
except Exception as err:
logger.debug(err)
time.sleep(1)

self.counter_ += 1
if self.counter_ % 100 == 0:
logger.debug(self.counter_)

# return (solana_signature, trx)
Loading