Skip to content

Commit e1fae39

Browse files
committed
tmp: split advance_txs 9
1 parent 2017d7a commit e1fae39

1 file changed

Lines changed: 41 additions & 49 deletions

File tree

src/electrumx/server/block_processor.py

Lines changed: 41 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,10 @@
1010

1111

1212
import asyncio
13+
import concurrent.futures
1314
from concurrent.futures import ThreadPoolExecutor
1415
import time
1516
from typing import Sequence, Tuple, List, Callable, Optional, TYPE_CHECKING, Type, Set
16-
try:
17-
from concurrent.futures import InterpreterPoolExecutor
18-
except ImportError:
19-
InterpreterPoolExecutor = None
2017

2118
from aiorpcx import run_in_thread, CancelledError, ignore_after
2219

@@ -41,19 +38,6 @@
4138
from electrumx.server.controller import Notifications
4239

4340

44-
class _myglobals:
45-
coin = None # type: type[Coin]
46-
47-
48-
def _init_pool(coin2):
49-
_myglobals.coin = coin2
50-
51-
52-
def eval_g_coin_block(raw_block: bytes, height: int) -> 'Block':
53-
return _myglobals.coin.block(raw_block, height)
54-
55-
56-
5741
class Prefetcher:
5842
'''Prefetches blocks (in the forward direction only).'''
5943

@@ -80,7 +64,7 @@ def __init__(
8064
self.cache_size = 0
8165
self.min_cache_size = 10 * 1024 * 1024
8266
# This makes the first fetch be 10 blocks
83-
self.ave_size = self.min_cache_size // 10
67+
self.ave_size = max(1, self.min_cache_size // 10)
8468
self.polling_delay = polling_delay_secs
8569

8670
async def main_loop(self, bp_height: int) -> None:
@@ -258,7 +242,7 @@ async def check_and_advance_blocks(self, raw_blocks: Sequence[bytes]) -> None:
258242
if not raw_blocks:
259243
return
260244
first = self.height + 1
261-
blocks = self.pool_executor.map(self.coin.block, raw_blocks, range(first, first + (len(raw_blocks))))
245+
blocks = self.pool_executor1.map(self.coin.block, raw_blocks, range(first, first + (len(raw_blocks))))
262246
blocks = list(blocks) # join threads
263247
headers = [block.header for block in blocks]
264248
hprevs = [self.coin.header_prevhash_rev(h) for h in headers]
@@ -461,7 +445,6 @@ def advance_blocks(self, blocks: Sequence['Block']) -> None:
461445
assert self.state_lock.locked()
462446
assert blocks
463447
min_height = self.db.min_undo_height(self.daemon.cached_height())
464-
genesis_activation = self.coin.GENESIS_ACTIVATION
465448
coin = self.coin
466449

467450
tx_num = self.tx_count
@@ -475,30 +458,36 @@ def advance_blocks(self, blocks: Sequence['Block']) -> None:
475458
self.db.tx_counts.append(tx_num)
476459
self.tx_count = tx_num
477460

461+
# process tx outputs
462+
blk_process_outputs = [] # type: list[concurrent.futures.Future]
478463
height = self.height
479464
for block in blocks:
480465
height += 1
481-
tx_num_start = self.db.tx_counts[height-1] if height > 0 else 0
482-
is_unspendable = (is_unspendable_genesis if height >= genesis_activation
483-
else is_unspendable_legacy)
484-
add_unflushed_hist1 = self.advance_txs_process_outputs(
466+
fut = self.pool_executor1.submit(
467+
self.advance_txs_process_outputs,
485468
block.transactions,
486-
tx_num_start=tx_num_start,
487-
is_unspendable=is_unspendable,
469+
height=height,
488470
)
489-
add_unflushed_hist1()
471+
blk_process_outputs.append(fut)
472+
for fut in blk_process_outputs:
473+
add_unflushed_hist = fut.result()
474+
add_unflushed_hist()
490475

476+
# process tx inputs
477+
blk_process_inputs = [] # type: list[concurrent.futures.Future]
491478
height = self.height
492479
for block in blocks:
493480
height += 1
494-
tx_num_start = self.db.tx_counts[height - 1] if height > 0 else 0
495-
add_unflushed_hist2 = self.advance_txs_process_inputs(
481+
fut = self.pool_executor1.submit(
482+
self.advance_txs_process_inputs,
496483
block.transactions,
497-
tx_num_start=tx_num_start,
498484
height=height,
499485
add_undo_info=height >= min_height,
500486
)
501-
add_unflushed_hist2()
487+
blk_process_inputs.append(fut)
488+
for fut in blk_process_inputs:
489+
add_unflushed_hist = fut.result()
490+
add_unflushed_hist()
502491

503492
height = self.height
504493
for block in blocks:
@@ -519,10 +508,14 @@ def advance_txs_process_outputs(
519508
self,
520509
txs: Sequence[Tx],
521510
*,
522-
is_unspendable: Callable[[bytes], bool],
523-
tx_num_start: int,
511+
height: int,
524512
) -> Callable[[], None]:
525513

514+
tx_num_start = self.db.tx_counts[height - 1] if height > 0 else 0
515+
is_unspendable = (
516+
is_unspendable_genesis if height >= self.coin.GENESIS_ACTIVATION
517+
else is_unspendable_legacy)
518+
526519
# Use local vars for speed in the loops
527520
script_hashX = self.coin.hashX_from_script
528521
put_utxo = self.utxo_cache.__setitem__
@@ -556,7 +549,7 @@ def process_txouts_for_chunk(txs_chunk):
556549
for tx_pos, tx in txs_chunk:
557550
process_txouts_for_single_tx(tx_pos=tx_pos, tx=tx)
558551

559-
list(self.pool_executor.map(
552+
list(self.pool_executor2.map(
560553
process_txouts_for_chunk,
561554
chunks(list(enumerate(txs)), 200),
562555
))
@@ -571,11 +564,12 @@ def advance_txs_process_inputs(
571564
self,
572565
txs: Sequence[Tx],
573566
*,
574-
tx_num_start: int,
575567
height: int,
576568
add_undo_info: bool,
577569
) -> Callable[[], None]:
578570

571+
tx_num_start = self.db.tx_counts[height - 1] if height > 0 else 0
572+
579573
# Use local vars for speed in the loops
580574
bl_undo_info = [b"" for _ in txs] # type: list[bytes]
581575
spend_utxo = self.spend_utxo
@@ -606,7 +600,7 @@ def process_txins_for_chunk(txs_chunk):
606600
for tx_pos, tx in txs_chunk:
607601
process_txins_for_single_tx(tx_pos=tx_pos, tx=tx)
608602

609-
list(self.pool_executor.map(
603+
list(self.pool_executor2.map(
610604
process_txins_for_chunk,
611605
chunks(list(enumerate(txs)), 200),
612606
))
@@ -865,19 +859,17 @@ async def fetch_and_process_blocks(self, caught_up_event: asyncio.Event) -> None
865859
'''
866860
self._caught_up_event = caught_up_event
867861
await self._first_open_dbs()
868-
XPoolExecutor = ThreadPoolExecutor
869-
# XPoolExecutor = InterpreterPoolExecutor or ThreadPoolExecutor
870-
self.logger.info(f"using pool executor: {XPoolExecutor}")
871-
with XPoolExecutor(initializer=_init_pool, initargs=(self.coin,)) as self.pool_executor:
872-
try:
873-
async with OldTaskGroup() as group:
874-
await group.spawn(self.prefetcher.main_loop(self.height))
875-
await group.spawn(self._process_prefetched_blocks())
876-
# Don't flush for arbitrary exceptions as they might be a cause or consequence of
877-
# corrupted data
878-
except CancelledError:
879-
self.logger.info('flushing to DB for a clean shutdown...')
880-
await self.flush(True)
862+
with ThreadPoolExecutor() as self.pool_executor1:
863+
with ThreadPoolExecutor() as self.pool_executor2:
864+
try:
865+
async with OldTaskGroup() as group:
866+
await group.spawn(self.prefetcher.main_loop(self.height))
867+
await group.spawn(self._process_prefetched_blocks())
868+
# Don't flush for arbitrary exceptions as they might be a cause or consequence of
869+
# corrupted data
870+
except CancelledError:
871+
self.logger.info('flushing to DB for a clean shutdown...')
872+
await self.flush(True)
881873

882874
def force_chain_reorg(self, count: int) -> bool:
883875
'''Force a reorg of the given number of blocks.

0 commit comments

Comments
 (0)