Skip to content

Commit d5a5015

Browse files
ivandzenivanl
andauthored
#337 сreate base airdropper service (#343)
* cherrypick part of changes * create indexer.py * remove solana_receipts_update.py * Cherry pick files from old branch * add requirement * fix refactoring issues * Fix inspection issues * fix last issue * simplify tests * add test Co-authored-by: ivanl <[email protected]>
1 parent e74e568 commit d5a5015

File tree

5 files changed

+871
-0
lines changed

5 files changed

+871
-0
lines changed

proxy/indexer/airdropper.py

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
from proxy.indexer.indexer_base import IndexerBase, logger
2+
import os
3+
import requests
4+
import base58
5+
import json
6+
import logging
7+
8+
try:
9+
from utils import check_error
10+
from sql_dict import SQLDict
11+
except ImportError:
12+
from .utils import check_error
13+
from .sql_dict import SQLDict
14+
15+
class Airdropper(IndexerBase):
16+
def __init__(self,
17+
solana_url,
18+
evm_loader_id,
19+
faucet_url = '',
20+
wrapper_whitelist = [],
21+
airdrop_amount = 10,
22+
log_level = 'INFO'):
23+
IndexerBase.__init__(self, solana_url, evm_loader_id, log_level)
24+
25+
# collection of eth-address-to-create-accout-trx mappings
26+
# for every addresses that was already funded with airdrop
27+
self.airdrop_ready = SQLDict(tablename="airdrop_ready")
28+
self.wrapper_whitelist = wrapper_whitelist
29+
self.airdrop_amount = airdrop_amount
30+
self.faucet_url = faucet_url
31+
32+
33+
# helper function checking if given contract address is in whitelist
34+
def _is_allowed_wrapper_contract(self, contract_addr):
35+
return contract_addr in self.wrapper_whitelist
36+
37+
38+
# helper function checking if given 'create account' corresponds to 'create erc20 token account' instruction
39+
def _check_create_instr(self, account_keys, create_acc, create_token_acc):
40+
# Must use the same Ethereum account
41+
if account_keys[create_acc['accounts'][1]] != account_keys[create_token_acc['accounts'][2]]:
42+
return False
43+
# Must use the same token program
44+
if account_keys[create_acc['accounts'][5]] != account_keys[create_token_acc['accounts'][6]]:
45+
return False
46+
# Token program must be system token program
47+
if account_keys[create_acc['accounts'][5]] != 'TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA':
48+
return False
49+
# CreateERC20TokenAccount instruction must use ERC20-wrapper from whitelist
50+
if not self._is_allowed_wrapper_contract(account_keys[create_token_acc['accounts'][3]]):
51+
return False
52+
return True
53+
54+
55+
# helper function checking if given 'create erc20 token account' corresponds to 'token transfer' instruction
56+
def _check_transfer(self, account_keys, create_token_acc, token_transfer) -> bool:
57+
return account_keys[create_token_acc['accounts'][1]] == account_keys[token_transfer['accounts'][1]]
58+
59+
60+
def _airdrop_to(self, create_acc):
61+
eth_address = "0x" + bytearray(base58.b58decode(create_acc['data'])[20:][:20]).hex()
62+
63+
if eth_address in self.airdrop_ready: # transaction already processed
64+
return
65+
66+
logger.info(f"Airdrop to address: {eth_address}")
67+
68+
json_data = { 'wallet': eth_address, 'amount': self.airdrop_amount }
69+
resp = requests.post(self.faucet_url + '/request_eth_token', json = json_data)
70+
if not resp.ok:
71+
logger.warning(f'Failed to airdrop: {resp.status_code}')
72+
return
73+
74+
self.airdrop_ready[eth_address] = create_acc
75+
76+
77+
def process_trx_airdropper_mode(self, trx):
78+
if check_error(trx):
79+
return
80+
81+
# helper function finding all instructions that satisfies predicate
82+
def find_instructions(trx, predicate):
83+
return [instr for instr in trx['transaction']['message']['instructions'] if predicate(instr)]
84+
85+
account_keys = trx["transaction"]["message"]["accountKeys"]
86+
87+
# Finding instructions specific for airdrop.
88+
# Airdrop triggers on sequence:
89+
# neon.CreateAccount -> neon.CreateERC20TokenAccount -> spl.Transfer (maybe shuffled)
90+
# First: select all instructions that can form such chains
91+
predicate = lambda instr: account_keys[instr['programIdIndex']] == self.evm_loader_id \
92+
and base58.b58decode(instr['data'])[0] == 0x02
93+
create_acc_list = find_instructions(trx, predicate)
94+
95+
predicate = lambda instr: account_keys[instr['programIdIndex']] == self.evm_loader_id \
96+
and base58.b58decode(instr['data'])[0] == 0x0f
97+
create_token_acc_list = find_instructions(trx, predicate)
98+
99+
predicate = lambda instr: account_keys[instr['programIdIndex']] == 'TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA' \
100+
and base58.b58decode(instr['data'])[0] == 0x03
101+
token_transfer_list = find_instructions(trx, predicate)
102+
103+
# Second: Find exact chains of instructions in sets created previously
104+
for create_acc in create_acc_list:
105+
for create_token_acc in create_token_acc_list:
106+
if not self._check_create_instr(account_keys, create_acc, create_token_acc):
107+
continue
108+
for token_transfer in token_transfer_list:
109+
if not self._check_transfer(account_keys, create_token_acc, token_transfer):
110+
continue
111+
self._airdrop_to(create_acc)
112+
113+
114+
def process_functions(self):
115+
IndexerBase.process_functions(self)
116+
logger.debug("Process receipts")
117+
self.process_receipts()
118+
119+
120+
def process_receipts(self):
121+
counter = 0
122+
for signature in self.transaction_order:
123+
counter += 1
124+
if signature in self.transaction_receipts:
125+
trx = self.transaction_receipts[signature]
126+
if trx is None:
127+
logger.error("trx is None")
128+
del self.transaction_receipts[signature]
129+
continue
130+
if 'slot' not in trx:
131+
logger.debug("\n{}".format(json.dumps(trx, indent=4, sort_keys=True)))
132+
exit()
133+
if trx['transaction']['message']['instructions'] is not None:
134+
self.process_trx_airdropper_mode(trx)
135+
136+
137+
def run_airdropper(solana_url,
138+
evm_loader_id,
139+
faucet_url = '',
140+
wrapper_whitelist = [],
141+
airdrop_amount = 10,
142+
log_level = 'INFO'):
143+
logging.basicConfig(format='%(asctime)s - pid:%(process)d [%(levelname)-.1s] %(funcName)s:%(lineno)d - %(message)s')
144+
logger.setLevel(logging.DEBUG)
145+
logger.info(f"""Running indexer with params:
146+
solana_url: {solana_url},
147+
evm_loader_id: {evm_loader_id},
148+
log_level: {log_level},
149+
faucet_url: {faucet_url},
150+
wrapper_whitelist: {wrapper_whitelist},
151+
airdrop_amount: {airdrop_amount}""")
152+
153+
airdropper = Airdropper(solana_url,
154+
evm_loader_id,
155+
faucet_url,
156+
wrapper_whitelist,
157+
airdrop_amount,
158+
log_level)
159+
airdropper.run()
160+
161+
162+
if __name__ == "__main__":
163+
solana_url = os.environ.get('SOLANA_URL', 'http://localhost:8899')
164+
evm_loader_id = os.environ.get('EVM_LOADER_ID', '53DfF883gyixYNXnM7s5xhdeyV8mVk9T4i2hGV9vG9io')
165+
faucet_url = os.environ.get('FAUCET_URL', 'http://localhost:3333')
166+
wrapper_whitelist = os.environ.get('INDEXER_ERC20_WRAPPER_WHITELIST', '').split(',')
167+
airdrop_amount = os.environ.get('AIRDROP_AMOUNT', 0)
168+
log_level = os.environ.get('LOG_LEVEL', 'INFO')
169+
170+
run_airdropper(solana_url,
171+
evm_loader_id,
172+
faucet_url,
173+
wrapper_whitelist,
174+
airdrop_amount,
175+
log_level)

proxy/testing/mock_server.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import requests
2+
3+
from flask import Flask, request
4+
from threading import Thread
5+
6+
class MockServer(Thread):
7+
def __init__(self, port):
8+
super().__init__()
9+
self.port = port
10+
self.app = Flask(__name__)
11+
self.url = "http://localhost:%s" % self.port
12+
self.app.add_url_rule("/shutdown", view_func=self._shutdown_server)
13+
14+
def add_url_rule(self, url, callback, methods):
15+
self.app.add_url_rule(url, view_func=callback, methods=methods)
16+
17+
def _shutdown_server(self):
18+
if not 'werkzeug.server.shutdown' in request.environ:
19+
raise RuntimeError('Not running the development server')
20+
request.environ['werkzeug.server.shutdown']()
21+
return 'Server shutting down...'
22+
23+
def run(self):
24+
self.app.run(port=self.port)
25+
26+
def shutdown_server(self):
27+
requests.get("http://localhost:%s/shutdown" % self.port)
28+
self.join()

proxy/testing/test_airdropper.py

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
import unittest
2+
from proxy.testing.mock_server import MockServer
3+
from proxy.indexer.airdropper import Airdropper
4+
from proxy.indexer.sql_dict import SQLDict
5+
import time
6+
from flask import request, Response
7+
from unittest.mock import MagicMock, patch, call, ANY
8+
import itertools
9+
from proxy.testing.transactions import pre_token_airdrop_trx1, pre_token_airdrop_trx2,\
10+
create_sol_acc_and_airdrop_trx, wrapper_whitelist, evm_loader_addr, token_airdrop_address1, \
11+
token_airdrop_address2, token_airdrop_address3
12+
13+
class MockFaucet(MockServer):
14+
def __init__(self, port):
15+
super().__init__(port)
16+
self.request_eth_token_mock = MagicMock()
17+
self.request_eth_token_mock.side_effect = itertools.repeat({})
18+
self.add_url_rule("/request_eth_token", callback=self.request_eth_token, methods=['POST'])
19+
20+
def request_eth_token(self):
21+
req = request.get_json()
22+
return self.request_eth_token_mock(req)
23+
24+
25+
def create_signature_for_address(signature: str):
26+
return {
27+
'blockTime': 1638177745, # not make sense
28+
'confirmationStatus': 'finalized',
29+
'err': None,
30+
'memo': None,
31+
'signature': signature,
32+
'slot': 9748200 # not make sense
33+
}
34+
35+
36+
def create_get_signatures_for_address(signatures: list):
37+
return {
38+
'jsonrpc': '2.0',
39+
'result': [ create_signature_for_address(sign) for sign in signatures ],
40+
'id': 1
41+
}
42+
43+
44+
class Test_Airdropper(unittest.TestCase):
45+
@classmethod
46+
def setUpClass(cls) -> None:
47+
print("testing indexer in airdropper mode")
48+
cls.address = 'localhost'
49+
cls.faucet_port = 3333
50+
cls.airdrop_amount = 10
51+
52+
cls.faucet = MockFaucet(cls.faucet_port)
53+
cls.faucet.start()
54+
time.sleep(0.2)
55+
56+
cls.evm_loader_id = evm_loader_addr
57+
cls.wrapper_whitelist = wrapper_whitelist
58+
cls.airdropper = Airdropper(f'http://{cls.address}:8899',
59+
cls.evm_loader_id,
60+
f'http://{cls.address}:{cls.faucet_port}',
61+
cls.wrapper_whitelist,
62+
cls.airdrop_amount,
63+
'INFO')
64+
65+
66+
@classmethod
67+
def tearDownClass(cls) -> None:
68+
cls.faucet.shutdown_server()
69+
cls.faucet.join()
70+
71+
72+
@patch.object(SQLDict, '__setitem__')
73+
@patch.object(SQLDict, '__contains__')
74+
def test_success_process_trx_with_one_airdrop(self,
75+
mock_sql_dict_contains,
76+
mock_sql_dict_setitem):
77+
print("\n\nShould airdrop to new address - one target in transaction")
78+
mock_sql_dict_contains.side_effect = [False] # new eth address
79+
self.faucet.request_eth_token_mock.side_effect = [Response("{}", status=200, mimetype='application/json')]
80+
81+
self.airdropper.process_trx_airdropper_mode(pre_token_airdrop_trx1)
82+
83+
mock_sql_dict_contains.assert_called_once_with(token_airdrop_address1)
84+
mock_sql_dict_setitem.assert_has_calls([call(token_airdrop_address1, ANY)])
85+
json_req = {'wallet': token_airdrop_address1, 'amount': self.airdrop_amount}
86+
self.faucet.request_eth_token_mock.assert_called_once_with(json_req)
87+
self.faucet.request_eth_token_mock.reset_mock()
88+
89+
90+
@patch.object(Airdropper, '_is_allowed_wrapper_contract')
91+
@patch.object(SQLDict, '__setitem__')
92+
@patch.object(SQLDict, '__contains__')
93+
def test_failed_airdrop_contract_not_in_whitelist(self,
94+
mock_sql_dict_contains,
95+
mock_sql_dict_setitem,
96+
mock_is_allowed_contract):
97+
print("\n\nShould not airdrop for contract that is not in whitelist")
98+
mock_is_allowed_contract.side_effect = [False]
99+
self.airdropper.process_trx_airdropper_mode(pre_token_airdrop_trx1)
100+
101+
mock_is_allowed_contract.assert_called_once()
102+
mock_sql_dict_contains.assert_not_called()
103+
mock_sql_dict_setitem.assert_not_called()
104+
self.faucet.request_eth_token_mock.assert_not_called()
105+
self.faucet.request_eth_token_mock.reset_mock()
106+
107+
108+
@patch.object(SQLDict, '__setitem__')
109+
@patch.object(SQLDict, '__contains__')
110+
def test_faucet_failure(self,
111+
mock_sql_dict_contains,
112+
mock_sql_dict_setitem):
113+
print("\n\nShould not add address to processed list due to faucet error")
114+
mock_sql_dict_contains.side_effect = [False] # new eth address
115+
self.faucet.request_eth_token_mock.side_effect = [Response("{}", status=400, mimetype='application/json')]
116+
117+
self.airdropper.process_trx_airdropper_mode(pre_token_airdrop_trx1)
118+
119+
mock_sql_dict_contains.assert_called_once_with(token_airdrop_address1)
120+
mock_sql_dict_setitem.assert_not_called()
121+
json_req = {'wallet': token_airdrop_address1, 'amount': self.airdrop_amount}
122+
self.faucet.request_eth_token_mock.assert_called_once_with(json_req)
123+
self.faucet.request_eth_token_mock.reset_mock()
124+
125+
126+
@patch.object(SQLDict, '__setitem__')
127+
@patch.object(SQLDict, '__contains__')
128+
def test_process_trx_with_one_airdrop_for_already_processed_address(self,
129+
mock_sql_dict_contains,
130+
mock_sql_dict_setitem):
131+
print("\n\nShould not airdrop to repeated address")
132+
mock_sql_dict_contains.side_effect = [True] # eth address processed later
133+
134+
self.airdropper.process_trx_airdropper_mode(pre_token_airdrop_trx1)
135+
136+
mock_sql_dict_contains.assert_called_once_with(token_airdrop_address1)
137+
mock_sql_dict_setitem.assert_not_called()
138+
self.faucet.request_eth_token_mock.assert_not_called()
139+
self.faucet.request_eth_token_mock.reset_mock()
140+
141+
142+
@patch.object(SQLDict, '__setitem__')
143+
@patch.object(SQLDict, '__contains__')
144+
def test_complex_transation(self,
145+
mock_sql_dict_contains,
146+
mock_sql_dict_setitem):
147+
print("\n\nShould airdrop to several targets in one transaction")
148+
mock_sql_dict_contains.side_effect = [False, False] # both targets are new
149+
self.faucet.request_eth_token_mock.side_effect = [Response("{}", status=200, mimetype='application/json'),
150+
Response("{}", status=200, mimetype='application/json')]
151+
152+
self.airdropper.process_trx_airdropper_mode(pre_token_airdrop_trx2)
153+
154+
mock_sql_dict_contains.assert_has_calls([call(token_airdrop_address3),
155+
call(token_airdrop_address2)])
156+
mock_sql_dict_setitem.assert_has_calls([call(token_airdrop_address3, ANY),
157+
call(token_airdrop_address2, ANY)])
158+
json_req1 = {'wallet': token_airdrop_address2, 'amount': self.airdrop_amount}
159+
json_req2 = {'wallet': token_airdrop_address3, 'amount': self.airdrop_amount}
160+
self.faucet.request_eth_token_mock.assert_has_calls([call(json_req2), call(json_req1)])
161+
self.faucet.request_eth_token_mock.reset_mock()
162+
163+
164+
@patch.object(SQLDict, '__setitem__')
165+
@patch.object(SQLDict, '__contains__')
166+
def test_no_airdrop_instructions(self,
167+
mock_sql_dict_contains,
168+
mock_sql_dict_setitem):
169+
print("\n\nShould not airdrop when instructions are inconsistent")
170+
self.airdropper.process_trx_airdropper_mode(create_sol_acc_and_airdrop_trx)
171+
172+
mock_sql_dict_contains.assert_not_called()
173+
mock_sql_dict_setitem.assert_not_called()
174+
self.faucet.request_eth_token_mock.assert_not_called()
175+
self.faucet.request_eth_token_mock.reset_mock()
176+

0 commit comments

Comments
 (0)