Skip to content

Commit d97361e

Browse files
author
Neil Booth
committed
Start on chain state
1 parent 41d2dab commit d97361e

File tree

5 files changed

+223
-240
lines changed

5 files changed

+223
-240
lines changed

electrumx/lib/coins.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,9 @@ class BitcoinTestnetMixin:
199199
GENESIS_HASH = ('000000000933ea01ad0ee984209779ba'
200200
'aec3ced90fa3f408719526f8d77f4943')
201201
REORG_LIMIT = 8000
202-
CHAIN_SIZE = 20_000_000_000
203-
CHAIN_SIZE_HEIGHT = 1_454_000
204-
AVG_BLOCK_SIZE = 10_000
202+
CHAIN_SIZE = 26_584_216_544
203+
CHAIN_SIZE_HEIGHT = 1_454_438
204+
AVG_BLOCK_SIZE = 200_000
205205

206206
RPC_PORT = 18332
207207
PEER_DEFAULT_PORTS = {'t': '51001', 's': '51002'}

electrumx/server/block_processor.py

Lines changed: 46 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class OnDiskBlock:
4343
tasks = {}
4444
# If set it logs the next time a block is processed
4545
daemon_height = None
46-
chain_size = 0
46+
state = None
4747

4848
def __init__(self, hex_hash, height, size):
4949
self.hex_hash = hex_hash
@@ -92,7 +92,7 @@ def iter_txs(self):
9292
if self.daemon_height:
9393
logger.info(f'height {self.height:,d} of {self.daemon_height:,d} {self.hex_hash} '
9494
f'{self.date_str()} {self.size / 1_000_000_000:.3f}GB {tx_count:,d} txs '
95-
f'chain {self.chain_size // 1_000_000_000:,d}GB')
95+
f'chain {self.state.chain_size // 1_000_000_000:,d}GB')
9696
OnDiskBlock.daemon_height = None
9797

9898
count = 0
@@ -286,12 +286,11 @@ def __init__(self, env, db, daemon, notifications):
286286
self.touched = set()
287287
# A count >= 0 is a user-forced reorg; < 0 is a natural reorg
288288
self.reorg_count = None
289-
self.height = -1
290-
self.tip = None
291-
self.tx_count = 0
292-
self.chain_size = 0
293289
self.force_flush_arg = None
294290

291+
# State. Initially taken from DB;
292+
self.state = None
293+
295294
# Caches of unflushed items.
296295
self.headers = []
297296
self.tx_hashes = []
@@ -304,7 +303,7 @@ def __init__(self, env, db, daemon, notifications):
304303
# Signalled after backing up during a reorg to flush session manager caches
305304
self.backed_up_event = asyncio.Event()
306305

307-
# When the lock is acquired, in-memory chain state is consistent with self.height.
306+
# When the lock is acquired, in-memory chain state is consistent with state.height.
308307
# This is a requirement for safe flushing.
309308
self.state_lock = asyncio.Lock()
310309

@@ -322,7 +321,7 @@ async def next_block_hashes(self, count=30):
322321
daemon_height = await self.daemon.height()
323322

324323
# Fetch remaining block hashes to a limit
325-
first = self.height + 1
324+
first = self.state.height + 1
326325
n = min(daemon_height - first + 1, count * 2)
327326
if n:
328327
hex_hashes = await self.daemon.block_hex_hashes(first, n)
@@ -353,15 +352,15 @@ async def reorg_chain(self, count):
353352
await OnDiskBlock.prefetch_many(self.daemon, pairs, 'reorg')
354353

355354
for hex_hash in reversed(hex_hashes):
356-
if hex_hash != hash_to_hex_str(self.tip):
355+
if hex_hash != hash_to_hex_str(self.state.tip):
357356
logger.error(f'block {hex_hash} is not tip; cannot back up')
358357
return
359358
block = await OnDiskBlock.streamed_block(hex_hash)
360359
if not block:
361360
break
362361
await self.run_with_lock(run_in_thread(self.backup_block, block))
363362

364-
logger.info(f'backed up to height {self.height:,d}')
363+
logger.info(f'backed up to height {self.state.height:,d}')
365364
self.backed_up_event.set()
366365
self.backed_up_event.clear()
367366

@@ -395,9 +394,10 @@ def diff_pos(hashes1, hashes2):
395394
return n
396395
return len(hashes)
397396

397+
height = self.state.height
398398
if count < 0:
399399
# A real reorg
400-
start = self.height - 1
400+
start = height - 1
401401
count = 1
402402
while start > 0:
403403
hashes = await self.db.fs_block_hashes(start, count)
@@ -410,27 +410,25 @@ def diff_pos(hashes1, hashes2):
410410
count = min(count * 2, start)
411411
start -= count
412412

413-
count = (self.height - start) + 1
413+
count = (height - start) + 1
414414
else:
415-
start = (self.height - count) + 1
415+
start = (height - count) + 1
416416

417417
return start, count
418418

419419
# - Flushing
420420
def flush_data(self):
421421
'''The data for a flush.'''
422-
return FlushData(self.height, self.tx_count, self.chain_size, self.headers,
423-
self.tx_hashes, self.undo_infos, self.utxo_cache,
424-
self.db_deletes, self.tip)
422+
return FlushData(self.state, self.headers, self.tx_hashes, self.undo_infos,
423+
self.utxo_cache, self.db_deletes)
425424

426425
async def flush(self, flush_utxos):
427426
self.force_flush_arg = None
428427
# Estimate size remaining
429428
daemon_height = self.daemon.cached_height()
430-
tail_size = ((daemon_height - max(self.height, self.coin.CHAIN_SIZE_HEIGHT))
429+
tail_size = ((daemon_height - max(self.state.height, self.coin.CHAIN_SIZE_HEIGHT))
431430
* self.coin.AVG_BLOCK_SIZE)
432-
size_remaining = max(self.coin.CHAIN_SIZE - self.chain_size, 0) + tail_size
433-
logger.info(f'SIZES: {tail_size} {size_remaining}')
431+
size_remaining = max(self.coin.CHAIN_SIZE - self.state.chain_size, 0) + tail_size
434432
await run_in_thread(self.db.flush_dbs, self.flush_data(), flush_utxos, size_remaining)
435433

436434
async def check_cache_size_loop(self):
@@ -444,13 +442,12 @@ async def check_cache_size_loop(self):
444442
db_deletes_size = len(self.db_deletes) * 57
445443
hist_cache_size = self.db.history.unflushed_memsize()
446444
# Roughly ntxs * 32 + nblocks * 42
447-
tx_hash_size = ((self.tx_count - self.db.fs_tx_count) * 32
448-
+ (self.height - self.db.fs_height) * 42)
445+
tx_hash_size = ((self.state.tx_count - self.db.fs_tx_count) * 32
446+
+ (self.state.height - self.db.fs_height) * 42)
449447
utxo_MB = (db_deletes_size + utxo_cache_size) // one_MB
450448
hist_MB = (hist_cache_size + tx_hash_size) // one_MB
451449

452450
OnDiskBlock.daemon_height = await self.daemon.height()
453-
OnDiskBlock.chain_size = self.chain_size
454451
if hist_MB:
455452
logger.info(f'UTXOs {utxo_MB:,d}MB hist {hist_MB:,d}MB')
456453

@@ -486,9 +483,10 @@ def advance_block(self, block):
486483
self.coin.GENESIS_ACTIVATION else is_unspendable_legacy)
487484

488485
# Use local vars for speed in the loops
486+
state = self.state
489487
tx_hashes = []
490488
undo_info = []
491-
tx_num = self.tx_count
489+
tx_num = state.tx_count
492490
script_hashX = self.coin.hashX_from_script
493491
put_utxo = self.utxo_cache.__setitem__
494492
spend_utxo = self.spend_utxo
@@ -502,7 +500,7 @@ def advance_block(self, block):
502500

503501
self.ok = False
504502
with block as block:
505-
if self.coin.header_prevhash(block.header) != self.tip:
503+
if self.coin.header_prevhash(block.header) != self.state.tip:
506504
self.reorg_count = -1
507505
return
508506

@@ -536,19 +534,19 @@ def advance_block(self, block):
536534
append_tx_hash(tx_hash)
537535
tx_num += 1
538536

537+
# Do this first - it uses the prior state
539538
self.tx_hashes.append(b''.join(tx_hashes))
540-
self.db.history.add_unflushed(hashXs_by_tx, self.tx_count)
541-
542-
self.tx_count = tx_num
539+
self.db.history.add_unflushed(hashXs_by_tx, state.tx_count)
543540
self.db.tx_counts.append(tx_num)
544-
545541
if block.height >= self.db.min_undo_height(self.daemon.cached_height()):
546542
self.undo_infos.append((undo_info, block.height))
547-
548-
self.height = block.height
549543
self.headers.append(block.header)
550-
self.tip = self.coin.header_hash(block.header)
551-
self.chain_size += block.size
544+
545+
# Update state
546+
state.height = block.height
547+
state.tip = self.coin.header_hash(block.header)
548+
state.chain_size += block.size
549+
state.tx_count = tx_num
552550
self.ok = True
553551

554552
def backup_block(self, block):
@@ -557,7 +555,7 @@ def backup_block(self, block):
557555
assert block.height > 0
558556
genesis_activation = self.coin.GENESIS_ACTIVATION
559557

560-
is_unspendable = (is_unspendable_genesis if self.height >= genesis_activation
558+
is_unspendable = (is_unspendable_genesis if block.height >= genesis_activation
561559
else is_unspendable_legacy)
562560

563561
# Prevout values, in order down the block (coinbase first if present)
@@ -574,7 +572,6 @@ def backup_block(self, block):
574572
undo_entry_len = 13 + HASHX_LEN
575573

576574
count = 0
577-
578575
with block as block:
579576
for tx, tx_hash in block.iter_txs_reversed():
580577
for idx, txout in enumerate(tx.outputs):
@@ -597,12 +594,15 @@ def backup_block(self, block):
597594
count += 1
598595

599596
assert n == 0
600-
self.tx_count -= count
601-
self.chain_size -= block.size
602-
self.tip = self.coin.header_prevhash(block.header)
603-
self.height -= 1
604-
self.db.tx_counts.pop()
605597

598+
# Update state
599+
state = self.state
600+
state.height -= 1
601+
state.tip = self.coin.header_prevhash(block.header)
602+
state.chain_size -= block.size
603+
state.tx_count -= count
604+
605+
self.db.tx_counts.pop()
606606
# self.touched can include other addresses which is harmless, but remove None.
607607
self.touched.discard(None)
608608
self.db.flush_backup(self.flush_data(), self.touched)
@@ -705,27 +705,20 @@ def spend_utxo(self, tx_hash, tx_idx):
705705
raise ChainError(f'UTXO {hash_to_hex_str(tx_hash)} / {tx_idx:,d} not found in "h" table')
706706

707707
async def on_caught_up(self):
708-
is_first_sync = self.db.first_sync
709-
self.db.first_sync = False
708+
was_first_sync = self.state.first_sync
709+
self.state.first_sync = False
710710
await self.flush(True)
711711
if self.caught_up:
712712
# Flush everything before notifying as client queries are performed on the DB
713-
await self.notifications.on_block(self.touched, self.height)
713+
await self.notifications.on_block(self.touched, self.state.height)
714714
self.touched = set()
715715
else:
716716
self.caught_up = True
717-
if is_first_sync:
718-
logger.info(f'{electrumx.version} synced to height {self.height:,d}')
717+
if was_first_sync:
718+
logger.info(f'{electrumx.version} synced to height {self.state.height:,d}')
719719
# Reopen for serving
720720
await self.db.open_for_serving()
721721

722-
async def _first_open_dbs(self):
723-
await self.db.open_for_sync()
724-
self.height = self.db.db_height
725-
self.tip = self.db.db_tip
726-
self.tx_count = self.db.db_tx_count
727-
self.chain_size = self.db.db_chain_size
728-
729722
# --- External API
730723

731724
async def fetch_and_process_blocks(self, caught_up_event):
@@ -740,7 +733,7 @@ async def fetch_and_process_blocks(self, caught_up_event):
740733
disk before exiting, as otherwise a significant amount of work
741734
could be lost.
742735
'''
743-
await self._first_open_dbs()
736+
self.state = OnDiskBlock.state = await self.db.open_for_sync().copy()
744737
await OnDiskBlock.scan_files()
745738

746739
try:
@@ -749,7 +742,7 @@ async def fetch_and_process_blocks(self, caught_up_event):
749742
hex_hashes, daemon_height = await self.next_block_hashes()
750743
if show_summary:
751744
show_summary = False
752-
behind = daemon_height - self.height
745+
behind = daemon_height - self.state.height
753746
if behind > 0:
754747
logger.info(f'catching up to daemon height {daemon_height:,d} '
755748
f'({behind:,d} blocks behind)')

electrumx/server/controller.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ async def serve(self, shutdown_event):
107107

108108
# Set notifications up to implement the MemPoolAPI
109109
def get_db_height():
110-
return db.db_height
110+
return db.state.height
111111
notifications.height = daemon.height
112112
notifications.db_height = get_db_height
113113
notifications.cached_height = daemon.cached_height

0 commit comments

Comments
 (0)