Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
5984c19
cherrypick part of changes
Dec 1, 2021
613a469
create indexer.py
Dec 1, 2021
c0d0e6c
remove solana_receipts_update.py
Dec 1, 2021
3f6b5c4
Cherry pick files from old branch
Dec 1, 2021
0790298
add requirement
Dec 1, 2021
340c854
fix refactoring issues
Dec 1, 2021
7449b38
Fix inspection issues
Dec 1, 2021
b3dacfa
fix last issue
Dec 1, 2021
a50cd47
Merge branch '336_indexer_refactoring' into 337_сreate_base_airdroppe…
Dec 1, 2021
2b8f879
Merge remote-tracking branch 'origin/develop' into 337_сreate_base_ai…
Dec 2, 2021
f51f2ed
simplify tests
Dec 2, 2021
6678924
add test
Dec 2, 2021
5d454b7
Merge remote-tracking branch 'origin/develop' into 337_сreate_base_ai…
Dec 3, 2021
add136a
add price provider
Dec 1, 2021
9a4be44
fix PriceProvider, add test
Dec 1, 2021
07aaca8
Add tests. Check worn on all nets
Dec 1, 2021
7a46c12
refactoring
Dec 1, 2021
2fc1424
integrate price_provider into airdropper
Dec 2, 2021
d157d67
integrate price provider
Dec 2, 2021
8a6abfd
use new faucet method
Dec 3, 2021
3d4dec9
add new parameter to airdropper main
Dec 3, 2021
5c29832
Test discriptions for airdropper
Dec 3, 2021
6a4efdc
Comments for price provider tests
Dec 3, 2021
14aeeed
remove unnecessary comment
Dec 3, 2021
bd35791
Merge remote-tracking branch 'origin/develop' into 338_create_sol_pri…
Dec 3, 2021
ff1f557
Merge remote-tracking branch 'origin/develop' into 338_create_sol_pri…
Dec 6, 2021
c28ca8e
fix error
Dec 6, 2021
8c68755
Merge remote-tracking branch 'origin/develop' into 338_create_sol_pri…
Dec 7, 2021
5325895
Merge remote-tracking branch 'origin/develop' into 338_create_sol_pri…
Dec 8, 2021
a1cbad2
fix airdropper run
Dec 10, 2021
4ed6d78
remove duplicated code
Dec 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
cherrypick part of changes
  • Loading branch information
ivanl committed Dec 1, 2021
commit 5984c1910af83f486a4d601accb495918ec1f976
4 changes: 4 additions & 0 deletions .buildkite/steps/deploy-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ docker run --rm -ti --network=container:proxy \
-e SOLANA_URL \
-e EXTRA_GAS=100000 \
-e NEW_USER_AIRDROP_AMOUNT=100 \
-e POSTGRES_DB=neon-db \
-e POSTGRES_USER=neon-proxy \
-e POSTGRES_PASSWORD=neon-proxy-pass \
-e POSTGRES_HOST=postgres \
--entrypoint ./proxy/deploy-test.sh \
${EXTRA_ARGS:-} \
$PROXY_IMAGE \
Expand Down
2 changes: 1 addition & 1 deletion proxy/docker-compose-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ services:
POSTGRES_PASSWORD: neon-proxy-pass
hostname: postgres
healthcheck:
test: [ CMD-SHELL, "pg_isready" ]
test: [ CMD-SHELL, "pg_isready -h postgres -p 5432" ]
interval: 5s
timeout: 10s
retries: 10
Expand Down
202 changes: 202 additions & 0 deletions proxy/indexer/indexer_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import base58
import os
import time
import logging
from solana.rpc.api import Client
from multiprocessing.dummy import Pool as ThreadPool
from typing import Dict, Union

try:
from utils import Canceller
from sql_dict import SQLDict
except ImportError:
from .utils import Canceller
from .sql_dict import SQLDict


PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2"))

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

DEVNET_HISTORY_START = "7BdwyUQ61RUZP63HABJkbW66beLk22tdXnP69KsvQBJekCPVaHoJY47Rw68b3VV1UbQNHxX3uxUSLfiJrfy2bTn"
HISTORY_START = [DEVNET_HISTORY_START]

UPDATE_BLOCK_COUNT = PARALLEL_REQUESTS * 16

log_levels = {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARN': logging.WARN,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'FATAL': logging.FATAL,
'CRITICAL': logging.CRITICAL
}

class IndexerBase:
def __init__(self,
solana_url,
evm_loader_id,
log_level):
self.evm_loader_id = evm_loader_id
self.client = Client(solana_url)
self.canceller = Canceller()
self.blocks_by_hash = SQLDict(tablename="solana_blocks_by_hash")
self.transaction_receipts = SQLDict(tablename="known_transactions")
self.constants = SQLDict(tablename="constants")

logger.setLevel(log_levels.get(log_level, logging.INFO))

self.last_slot = 0
self.current_slot = 0
self.transaction_order = []
if 'last_block' not in self.constants:
self.constants['last_block'] = 0
self.blocked_storages = {}
self.counter_ = 0


def run(self):
while (True):
try:
logger.debug("Start indexing")
self.gather_unknown_transactions()
logger.debug("Process receipts")
self.process_receipts()
logger.debug("Start getting blocks")
self.gather_blocks()
logger.debug("Unlock accounts")
self.canceller.unlock_accounts(self.blocked_storages)
self.blocked_storages = {}
except Exception as err:
logger.debug("Got exception while indexing. Type(err):%s, Exception:%s", type(err), err)


def gather_unknown_transactions(self):
poll_txs = set()
ordered_txs = []

minimal_tx = None
continue_flag = True
current_slot = self.client.get_slot(commitment="confirmed")["result"]
maximum_slot = self.last_slot
minimal_slot = current_slot

counter = 0
while (continue_flag):
opts: Dict[str, Union[int, str]] = {}
if minimal_tx:
opts["before"] = minimal_tx
opts["commitment"] = "confirmed"
result = self.client._provider.make_request("getSignaturesForAddress", self.evm_loader_id, opts)
logger.debug("{:>3} get_signatures_for_address {}".format(counter, len(result["result"])))
counter += 1

if len(result["result"]) == 0:
logger.debug("len(result['result']) == 0")
break

for tx in result["result"]:
solana_signature = tx["signature"]
slot = tx["slot"]

if solana_signature in HISTORY_START:
logger.debug(solana_signature)
continue_flag = False
break

ordered_txs.append(solana_signature)

if solana_signature not in self.transaction_receipts:
poll_txs.add(solana_signature)

if slot < minimal_slot:
minimal_slot = slot
minimal_tx = solana_signature

if slot > maximum_slot:
maximum_slot = slot

if slot < self.last_slot:
continue_flag = False
break

logger.debug("start getting receipts")
pool = ThreadPool(PARALLEL_REQUESTS)
pool.map(self.get_tx_receipts, poll_txs)

if len(self.transaction_order):
index = 0
try:
index = ordered_txs.index(self.transaction_order[0])
except ValueError:
self.transaction_order = ordered_txs + self.transaction_order
else:
self.transaction_order = ordered_txs[:index] + self.transaction_order
else:
self.transaction_order = ordered_txs

self.last_slot = maximum_slot
self.current_slot = current_slot

self.counter_ = 0


def get_tx_receipts(self, solana_signature):
# trx = None
retry = True

while retry:
try:
trx = self.client.get_confirmed_transaction(solana_signature)['result']
self.transaction_receipts[solana_signature] = trx
retry = False
except Exception as err:
logger.debug(err)
time.sleep(1)

self.counter_ += 1
if self.counter_ % 100 == 0:
logger.debug(self.counter_)

# return (solana_signature, trx)


def process_receipts(self): raise Exception('NotImplemented')

def gather_blocks(self):
max_slot = self.client.get_slot(commitment="recent")["result"]

last_block = self.constants['last_block']
if last_block + UPDATE_BLOCK_COUNT < max_slot:
max_slot = last_block + UPDATE_BLOCK_COUNT
slots = self.client._provider.make_request("getBlocks", last_block, max_slot, {"commitment": "confirmed"})["result"]

pool = ThreadPool(PARALLEL_REQUESTS)
results = pool.map(self.get_block, slots)

for block_result in results:
(slot, block_hash) = block_result
self.blocks_by_hash[block_hash] = slot

self.constants['last_block'] = max_slot


def get_block(self, slot):
retry = True

while retry:
try:
block = self.client._provider.make_request("getBlock", slot, {"commitment":"confirmed", "transactionDetails":"none", "rewards":False})['result']
block_hash = '0x' + base58.b58decode(block['blockhash']).hex()
retry = False
except Exception as err:
logger.debug(err)
time.sleep(1)

return (slot, block_hash)




7 changes: 5 additions & 2 deletions proxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
from .http.handler import HttpProtocolHandler

from multiprocessing import Process
from .indexer.solana_receipts_update import run_indexer
from .indexer.indexer import run_indexer
from proxy.environment import solana_url, evm_loader_id

logger = logging.getLogger(__name__)

Expand All @@ -45,7 +46,9 @@ def delete_pid_file(self) -> None:
os.remove(self.flags.pid_file)

def __enter__(self) -> 'Proxy':
self.indexer = Process(target=run_indexer)
self.indexer = Process(target=run_indexer,
args=(solana_url,
evm_loader_id,))
self.indexer.start()
self.acceptors = AcceptorPool(
flags=self.flags,
Expand Down