Skip to content

Commit a587c3b

Browse files
committed
Merge remote-tracking branch 'origin/develop' into 425_use_solanas_block_height_as_neon_block_number
2 parents 4c595af + 43f6e8b commit a587c3b

File tree

10 files changed

+534
-360
lines changed

10 files changed

+534
-360
lines changed

.buildkite/pipeline.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ steps:
1414
- "solana.log"
1515
- "measurements.log"
1616
- "evm_loader.log"
17+
- "faucet.log"
18+
- "airdropper.log"
1719

1820
- wait
1921

proxy/indexer/airdropper.py

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def __init__(self,
2828
neon_decimals = 9,
2929
start_slot = 0):
3030
IndexerBase.__init__(self, solana_url, evm_loader_id, log_level, start_slot)
31+
self.latest_processed_slot = 0
3132

3233
# collection of eth-address-to-create-accout-trx mappings
3334
# for every addresses that was already funded with airdrop
@@ -96,7 +97,7 @@ def _airdrop_to(self, create_acc):
9697
if not resp.ok:
9798
logger.warning(f'Failed to airdrop: {resp.status_code}')
9899
return
99-
100+
100101
self.airdrop_ready[eth_address] = create_acc
101102

102103

@@ -144,20 +145,12 @@ def process_functions(self):
144145

145146

146147
def process_receipts(self):
147-
counter = 0
148-
for signature in self.transaction_order:
149-
counter += 1
150-
if signature in self.transaction_receipts:
151-
trx = self.transaction_receipts[signature]
152-
if trx is None:
153-
logger.error("trx is None")
154-
del self.transaction_receipts[signature]
155-
continue
156-
if 'slot' not in trx:
157-
logger.debug("\n{}".format(json.dumps(trx, indent=4, sort_keys=True)))
158-
exit()
159-
if trx['transaction']['message']['instructions'] is not None:
160-
self.process_trx_airdropper_mode(trx)
148+
max_slot = 0
149+
for slot, _, trx in self.transaction_receipts.get_trxs(self.latest_processed_slot, reverse=True):
150+
max_slot = max(max_slot, slot)
151+
if trx['transaction']['message']['instructions'] is not None:
152+
self.process_trx_airdropper_mode(trx)
153+
self.latest_processed_slot = max(self.latest_processed_slot, max_slot)
161154

162155

163156
def run_airdropper(solana_url,

proxy/indexer/indexer.py

Lines changed: 290 additions & 270 deletions
Large diffs are not rendered by default.

proxy/indexer/indexer_base.py

Lines changed: 58 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77

88
try:
99
from sql_dict import SQLDict
10+
from trx_receipts_storage import TrxReceiptsStorage
1011
except ImportError:
1112
from .sql_dict import SQLDict
13+
from .trx_receipts_storage import TrxReceiptsStorage
1214

1315

1416
PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2"))
@@ -40,11 +42,19 @@ def __init__(self,
4042

4143
self.evm_loader_id = evm_loader_id
4244
self.client = Client(solana_url)
43-
self.transaction_receipts = SQLDict(tablename="known_transactions")
45+
self.transaction_receipts = TrxReceiptsStorage('transaction_receipts', log_level)
4446
self.last_slot = start_slot
4547
self.current_slot = 0
46-
self.transaction_order = []
4748
self.counter_ = 0
49+
self.max_known_tx = self.transaction_receipts.max_known_trx()
50+
self._move_data_from_old_table()
51+
52+
53+
def _move_data_from_old_table(self):
54+
if self.transaction_receipts.size() == 0:
55+
transaction_receipts_old = SQLDict(tablename="known_transactions")
56+
for signature, trx in transaction_receipts_old.iteritems():
57+
self._add_trx(signature, trx)
4858

4959

5060
def run(self):
@@ -53,6 +63,7 @@ def run(self):
5363
self.process_functions()
5464
except Exception as err:
5565
logger.warning("Got exception while indexing. Type(err):%s, Exception:%s", type(err), err)
66+
time.sleep(1.0)
5667

5768

5869
def process_functions(self):
@@ -62,82 +73,72 @@ def process_functions(self):
6273

6374
def gather_unknown_transactions(self):
6475
poll_txs = set()
65-
ordered_txs = []
6676

6777
minimal_tx = None
6878
continue_flag = True
6979
current_slot = self.client.get_slot(commitment="confirmed")["result"]
70-
maximum_slot = self.last_slot
71-
minimal_slot = current_slot
80+
81+
max_known_tx = self.max_known_tx
7282

7383
counter = 0
7484
while (continue_flag):
75-
opts: Dict[str, Union[int, str]] = {}
76-
if minimal_tx:
77-
opts["before"] = minimal_tx
78-
opts["commitment"] = "confirmed"
79-
result = self.client._provider.make_request("getSignaturesForAddress", self.evm_loader_id, opts)
80-
logger.debug("{:>3} get_signatures_for_address {}".format(counter, len(result["result"])))
85+
results = self._get_signatures(minimal_tx, self.max_known_tx[1])
86+
logger.debug("{:>3} get_signatures_for_address {}".format(counter, len(results)))
8187
counter += 1
8288

83-
if len(result["result"]) == 0:
84-
logger.debug("len(result['result']) == 0")
89+
if len(results) == 0:
90+
logger.debug("len(results) == 0")
8591
break
8692

87-
for tx in result["result"]:
93+
minimal_tx = results[-1]["signature"]
94+
max_tx = (results[0]["slot"], results[0]["signature"])
95+
max_known_tx = max(max_known_tx, max_tx)
96+
97+
for tx in results:
8898
solana_signature = tx["signature"]
8999
slot = tx["slot"]
90100

101+
if slot < self.last_slot:
102+
continue_flag = False
103+
break
104+
91105
if solana_signature in HISTORY_START:
92106
logger.debug(solana_signature)
93107
continue_flag = False
94108
break
95109

96-
ordered_txs.append(solana_signature)
97-
98-
if solana_signature not in self.transaction_receipts:
110+
if not self.transaction_receipts.contains(slot, solana_signature):
99111
poll_txs.add(solana_signature)
100112

101-
if slot < minimal_slot:
102-
minimal_slot = slot
103-
minimal_tx = solana_signature
104-
105-
if slot > maximum_slot:
106-
maximum_slot = slot
107-
108-
if slot < self.last_slot:
109-
continue_flag = False
110-
break
111-
112113
logger.debug("start getting receipts")
113114
pool = ThreadPool(PARALLEL_REQUESTS)
114-
pool.map(self.get_tx_receipts, poll_txs)
115-
116-
if len(self.transaction_order):
117-
index = 0
118-
try:
119-
index = ordered_txs.index(self.transaction_order[0])
120-
except ValueError:
121-
self.transaction_order = ordered_txs + self.transaction_order
122-
else:
123-
self.transaction_order = ordered_txs[:index] + self.transaction_order
124-
else:
125-
self.transaction_order = ordered_txs
115+
pool.map(self._get_tx_receipts, poll_txs)
126116

127-
self.last_slot = maximum_slot
128117
self.current_slot = current_slot
129-
130118
self.counter_ = 0
119+
logger.debug(max_known_tx)
120+
self.max_known_tx = max_known_tx
121+
122+
123+
def _get_signatures(self, before, until):
124+
opts: Dict[str, Union[int, str]] = {}
125+
if until is not None:
126+
opts["until"] = until
127+
if before is not None:
128+
opts["before"] = before
129+
opts["commitment"] = "confirmed"
130+
result = self.client._provider.make_request("getSignaturesForAddress", self.evm_loader_id, opts)
131+
return result['result']
131132

132133

133-
def get_tx_receipts(self, solana_signature):
134+
def _get_tx_receipts(self, solana_signature):
134135
# trx = None
135136
retry = True
136137

137138
while retry:
138139
try:
139140
trx = self.client.get_confirmed_transaction(solana_signature)['result']
140-
self.transaction_receipts[solana_signature] = trx
141+
self._add_trx(solana_signature, trx)
141142
retry = False
142143
except Exception as err:
143144
logger.debug(err)
@@ -147,4 +148,16 @@ def get_tx_receipts(self, solana_signature):
147148
if self.counter_ % 100 == 0:
148149
logger.debug(self.counter_)
149150

150-
# return (solana_signature, trx)
151+
152+
def _add_trx(self, solana_signature, trx):
153+
if trx is not None:
154+
add = False
155+
for instruction in trx['transaction']['message']['instructions']:
156+
if trx["transaction"]["message"]["accountKeys"][instruction["programIdIndex"]] == self.evm_loader_id:
157+
add = True
158+
if add:
159+
logger.debug((trx['slot'], solana_signature))
160+
self.transaction_receipts.add_trx(trx['slot'], solana_signature, trx)
161+
else:
162+
logger.debug(f"trx is None {solana_signature}")
163+

proxy/indexer/pg_common.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import psycopg2
2+
import os
3+
4+
POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db")
5+
POSTGRES_USER = os.environ.get("POSTGRES_USER", "neon-proxy")
6+
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "neon-proxy-pass")
7+
POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")
8+
9+
try:
10+
from cPickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL
11+
except ImportError:
12+
from pickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL
13+
14+
15+
def encode(obj):
16+
"""Serialize an object using pickle to a binary format accepted by SQLite."""
17+
return psycopg2.Binary(dumps(obj, protocol=PICKLE_PROTOCOL))
18+
19+
20+
def decode(obj):
21+
"""Deserialize objects retrieved from SQLite."""
22+
return loads(bytes(obj))
23+
24+
25+
def dummy(obj):
26+
"""Does nothing"""
27+
return obj

proxy/indexer/sql_dict.py

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,12 @@
11
import psycopg2
2-
import os
32
import logging
43
from collections.abc import MutableMapping
5-
6-
POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db")
7-
POSTGRES_USER = os.environ.get("POSTGRES_USER", "neon-proxy")
8-
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "neon-proxy-pass")
9-
POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")
10-
11-
try:
12-
from cPickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL
13-
except ImportError:
14-
from pickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL
15-
4+
from proxy.indexer.pg_common import POSTGRES_DB, POSTGRES_USER, POSTGRES_PASSWORD\
5+
, POSTGRES_HOST, encode, decode, dummy
166

177
logger = logging.getLogger(__name__)
188
logger.setLevel(logging.DEBUG)
199

20-
21-
def encode(obj):
22-
"""Serialize an object using pickle to a binary format accepted by SQLite."""
23-
return psycopg2.Binary(dumps(obj, protocol=PICKLE_PROTOCOL))
24-
25-
26-
def decode(obj):
27-
"""Deserialize objects retrieved from SQLite."""
28-
return loads(bytes(obj))
29-
30-
31-
def dummy(obj):
32-
"""Does nothing"""
33-
return obj
34-
35-
3610
class SQLDict(MutableMapping):
3711
"""Serialize an object using pickle to a binary format accepted by SQLite."""
3812

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import psycopg2
2+
import os
3+
import logging
4+
from proxy.indexer.pg_common import POSTGRES_DB, POSTGRES_USER, POSTGRES_PASSWORD\
5+
, POSTGRES_HOST, encode, decode, dummy
6+
7+
logger = logging.getLogger(__name__)
8+
9+
class TrxReceiptsStorage:
10+
def __init__(self, table_name, log_level = logging.DEBUG):
11+
self.table_name = table_name
12+
logger.setLevel(log_level)
13+
self.conn = psycopg2.connect(
14+
dbname=POSTGRES_DB,
15+
user=POSTGRES_USER,
16+
password=POSTGRES_PASSWORD,
17+
host=POSTGRES_HOST
18+
)
19+
20+
self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
21+
cur = self.conn.cursor()
22+
cur.execute(f'''
23+
CREATE TABLE IF NOT EXISTS
24+
{self.table_name} (
25+
slot BIGINT,
26+
signature VARCHAR(88),
27+
trx BYTEA,
28+
PRIMARY KEY (slot, signature)
29+
)
30+
''')
31+
32+
def clear(self):
33+
cur = self.conn.cursor()
34+
cur.execute(f'DELETE FROM {self.table_name}')
35+
36+
def size(self):
37+
cur = self.conn.cursor()
38+
cur.execute(f'SELECT COUNT(*) FROM {self.table_name}')
39+
rows = cur.fetchone()[0]
40+
return rows if rows is not None else 0
41+
42+
def max_known_trx(self):
43+
cur = self.conn.cursor()
44+
cur.execute(f'SELECT slot, signature FROM {self.table_name} ORDER BY slot DESC, signature DESC LIMIT 1')
45+
row = cur.fetchone()
46+
if row is not None:
47+
return (row[0], row[1])
48+
return (0, None) #table empty - return default value
49+
50+
def add_trx(self, slot, signature, trx):
51+
bin_trx = encode(trx)
52+
cur = self.conn.cursor()
53+
cur.execute(f'''
54+
INSERT INTO {self.table_name} (slot, signature, trx)
55+
VALUES ({slot},%s,%s)
56+
ON CONFLICT (slot, signature)
57+
DO UPDATE SET
58+
trx = EXCLUDED.trx
59+
''',
60+
(signature, bin_trx)
61+
)
62+
63+
def contains(self, slot, signature):
64+
cur = self.conn.cursor()
65+
cur.execute(f'SELECT 1 FROM {self.table_name} WHERE slot = %s AND signature = %s', (slot, signature,))
66+
return cur.fetchone() is not None
67+
68+
def get_trxs(self, start_slot = 0, reverse = False):
69+
cur = self.conn.cursor()
70+
order = 'DESC' if reverse else 'ASC'
71+
cur.execute(f'SELECT slot, signature, trx FROM {self.table_name} WHERE slot >= {start_slot} ORDER BY slot {order}')
72+
rows = cur.fetchall()
73+
for row in rows:
74+
yield row[0], row[1], decode(row[2])

proxy/plugin/solana_rest_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
modelInstanceLock = threading.Lock()
4747
modelInstance = None
4848

49-
NEON_PROXY_PKG_VERSION = '0.5.2-dev'
49+
NEON_PROXY_PKG_VERSION = '0.5.4-dev'
5050
NEON_PROXY_REVISION = 'NEON_PROXY_REVISION_TO_BE_REPLACED'
5151

5252

0 commit comments

Comments
 (0)