Skip to content

Commit e497c4c

Browse files
authored
Merge pull request #379 from SomberNight/202606_mempool3
mempool: put more event-loop-blocking calls to other thread
2 parents 29f54ba + 20e7079 commit e497c4c

2 files changed

Lines changed: 28 additions & 20 deletions

File tree

src/electrumx/server/mempool.py

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -317,9 +317,9 @@ async def _refresh_hashes(self, synchronized_event):
317317
while True:
318318
height = self.api.cached_height()
319319
txids_hum = await self.api.mempool_txids_hum()
320-
if height != await self.api.height():
320+
if height != await self.api.height(): # if height changed *again*, re-start
321321
continue
322-
txids_rev = {hex_str_to_hash(hh) for hh in txids_hum}
322+
txids_rev = await run_in_thread(lambda: {hex_str_to_hash(hh) for hh in txids_hum})
323323
try:
324324
async with self.lock:
325325
await self._process_mempool(
@@ -364,25 +364,31 @@ async def _process_mempool(
364364

365365
# 1. Handle txs that have disappeared (evicted, just got mined, etc)
366366
# TODO split disappeared txs workload into a threadpool, chunks of ~200 txs
367-
for txid_rev in (set(txs) - all_txids_rev):
368-
tx = txs.pop(txid_rev)
369-
# hashXs
370-
tx_hashXs = {hashX for hashX, value in tx.in_pairs}
371-
tx_hashXs.update(hashX for hashX, value in tx.out_pairs)
372-
for hashX in tx_hashXs:
373-
hashXs[hashX].remove(txid_rev)
374-
if not hashXs[hashX]:
375-
del hashXs[hashX]
376-
touched_hashxs |= tx_hashXs
377-
# outpoints
378-
for prevout in tx.prevouts:
379-
del txo_to_spender[prevout]
380-
touched_outpoints.add(prevout)
381-
for out_idx, out_pair in enumerate(tx.out_pairs):
382-
touched_outpoints.add((txid_rev, out_idx))
367+
def handle_disappeared_txs() -> int:
368+
nonlocal touched_hashxs
369+
disappeared_hashes = set(txs) - all_txids_rev
370+
for txid_rev in disappeared_hashes:
371+
tx = txs.pop(txid_rev)
372+
# hashXs
373+
tx_hashXs = {hashX for hashX, value in tx.in_pairs}
374+
tx_hashXs.update(hashX for hashX, value in tx.out_pairs)
375+
for hashX in tx_hashXs:
376+
hashXs[hashX].remove(txid_rev)
377+
if not hashXs[hashX]:
378+
del hashXs[hashX]
379+
touched_hashxs |= tx_hashXs
380+
# outpoints
381+
for prevout in tx.prevouts:
382+
del txo_to_spender[prevout]
383+
touched_outpoints.add(prevout)
384+
for out_idx, out_pair in enumerate(tx.out_pairs):
385+
touched_outpoints.add((txid_rev, out_idx))
386+
return len(disappeared_hashes)
387+
388+
await run_in_thread(handle_disappeared_txs)
383389

384390
# 2. Process new transactions
385-
new_hashes = list(all_txids_rev.difference(txs))
391+
new_hashes = await run_in_thread(lambda: list(all_txids_rev.difference(txs)))
386392
if new_hashes:
387393
# 2.1. fetch raw txs from bitcoin daemon
388394
group = OldTaskGroup()

tests/server/test_mempool.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,11 +267,13 @@ def in_caplog(caplog, message):
267267
@pytest.mark.asyncio
268268
async def test_keep_synchronized(caplog):
269269
api = API()
270-
mempool = MemPool(coin, api)
270+
mempool = MemPool(coin, api, refresh_secs=0.01)
271271
event = Event()
272272
with caplog.at_level(logging.INFO):
273273
async with OldTaskGroup() as group:
274274
await group.spawn(mempool.keep_synchronized, event)
275+
# do two iterations of _refresh_hashes, then cancel:
276+
await event.wait()
275277
await event.wait()
276278
await group.cancel_remaining()
277279

0 commit comments

Comments
 (0)