Skip to content

[WIP] EPD: encode-worker path, async embedding receive, E2P row-sharding#437

Draft
chenht2022 wants to merge 49 commits into
mainfrom
hongtaoc/epd-encode
Draft

[WIP] EPD: encode-worker path, async embedding receive, E2P row-sharding#437
chenht2022 wants to merge 49 commits into
mainfrom
hongtaoc/epd-encode

Conversation

@chenht2022

Copy link
Copy Markdown
Contributor

Summary

EPD (encode/prefill/decode disaggregation) engine-side line, 45 commits:

  • ViT-only encode worker path (submit_encode ingest API, encode event loop)
  • Poll-driven async embedding receive on prefill (admission via per-cycle MIN all-reduce; P2D sender deferred to admission)
  • Lifetime-registered receive pool (ends stale-rkey QP kills); receive-buffer deregistration deferred past NIC placement tail
  • E2P embedding row-sharding across prefill ranks + concurrent encode sender (default ON)
  • Env-gated tolerant checkpoint load for reduced-layer probe models
  • ring-lease pixel path, decode-on-receive-cycle, mm-strip/encode-send-race correctness fixes

Validation (122B-A10B-NVFP4, 3-node B200, TP1 workers)

  • e2e steady-state P8D2E8: 19.29 req/s (best single-gateway), per-GPU 1.07 req/s
  • OCRBench 500-sample temp0: acc 0.910 (baseline band 0.888-0.894)
  • Row-sharding default-ON smoke at TP1: zero disturbance vs prior deploy
  • TP=4 asymmetric E(TP1)->P(TP4) validated earlier on 397B; TP8 e2e pending (draft until then)

🤖 Generated with Claude Code

@chenht2022 chenht2022 changed the title EPD: encode-worker path, async embedding receive, E2P row-sharding [WIP] EPD: encode-worker path, async embedding receive, E2P row-sharding Jun 12, 2026
chenht2022 and others added 29 commits June 15, 2026 19:26
Introduce a vision-tower-only "encode" disaggregation role as the first
step toward EPD (encode-prefill-decode). An encode server runs the ViT
only and ships image embeddings to a prefill server; it owns no KV pool
and runs no LM forward.

- DisaggregationMode.ENCODE in runtime/pd/utils.py
- --disaggregation-mode gains the "encode" choice
- resolve_disaggregation/_handle_kvstore treat encode as no-KV, no-prefix-cache

Additive only; no existing prefill/decode path is touched.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
Add the kEncode variant to the C++ scheduler's DisaggregationMode enum
and expose it as "encode" through the nanobind bindings, matching the
Python-side DisaggregationMode.ENCODE.

The scheduler core does not branch on DisaggregationMode (it is a value
passed through to the Python runtime), so this is additive: no FSM or
variant-exhaustiveness changes are required for the enum value itself.

Verified: rebuilt ext .so exposes DisaggregationMode.encode (value 3).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
The encode (vision-tower-only) worker is orchestrated in Python rather than
through the C++ KV scheduler: it carries no KV cache and runs no LM forward,
so the paged-cache machinery does not apply. Add the two pieces it does need
as standalone, framework-agnostic, GPU-free components:

- EmbeddingCache: bytes-bounded LRU keyed by MultimodalDataItem.hash, so
  duplicate images skip the vision tower (mirrors SGLang MultiModalStaticCache).
- EncodeScheduler: deterministic patch-budget batcher. Items are packed by a
  per-batch token budget + max item count, ordered by (request_id, item_index)
  so every tensor-parallel rank forms an identical ViT batch (avoids NCCL
  deadlock in a TP vision tower, mirroring the C++ scheduler's id tie-break).

Both are decoupled (the encode loop checks the cache on arrival and only feeds
misses to the scheduler) and covered by unit tests (no torch import).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
The encode->prefill embedding hop reuses the Mooncake RDMA engine but ships a
single contiguous [num_tokens, hidden] tensor per item instead of per-layer KV
pages. Add the wire-protocol dataclasses for that hop:

- EmbeddingArgs: per-rank buffer/engine info (no KV pool); optional second
  buffer for deepstack embeddings.
- EmbeddingChunk: in-process transfer unit on the encode side (not serialized).
- EmbeddingArgsRegisterInfo / EmbeddingTransferInfo: the receiver(prefill)->
  sender(encode) registration and per-request pre-alloc frames, each owning
  its byte layout via symmetric to_zmq()/from_zmq() so frame positions live in
  one place (the KV path hardcodes them at both ends).

Pointers are packed as full uint64 ("Q"); the per-request frame carries the
n_tokens/hidden/dtype the encode side must assert before the unchecked RDMA
write (gotcha G2). Deepstack travels as a distinct second buffer.

Frames cross ZMQ only between control threads; managers/sender/receiver that
drive them follow next. Covered by GPU-free round-trip tests (7) that lock the
layout and 64-bit pointer encoding.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
MooncakeEmbeddingSender is the encode-side per-room sender, mirroring
MooncakeKVSender (sender.py:37): poll/clear/failure_exception are status-only
and faithfully reused; only send() differs, queueing one contiguous embedding
tensor (described by scalar ptr/shape/dtype/nbytes the encode executor reads off
item.encoded) instead of KV indices. Keeping send() on scalars rather than a
torch tensor leaves this class torch-free and unit-testable.

Covered by 6 fake-manager tests (init->Bootstrapping, chunk stamping incl.
deepstack fields, poll conclude-and-stick, Failed, failure_exception raises +
clears). The Mooncake manager (bootstrap thread + transfer_worker driving the
actual RDMA batch_transfer_sync) and the prefill-side receiver follow next;
those need GPU/RDMA to validate end to end.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
assign_encoded_embeddings turns one packed vision-tower output
[sum_tokens, width] into per-item item.encoded(+encoded_deepstack), which is
the silent-corruption-prone core of the encode worker: rows are split by each
item's post-merge token count (item.offsets via _item_token_count) and, for
deepstack models, columns are split via model.separate_deepstack_embeds into
main [N, hidden] + deepstack [N, hidden*ndeep]. Outputs are made contiguous
since a TP-gathered tower output may not be and the transfer ships raw bytes.

Isolated from the (GPU/RDMA-bound) executor + manager so the index math is
unit-tested on CPU with a fake model: per-item rows, multi-subgrid offsets,
deepstack column split + value alignment, plain (no-deepstack) path, and the
token-count mismatch guard. The DisaggEncodeExecutor wrapper + Mooncake encode
manager follow once the transport manager lands.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
Loads a real Qwen3.5 checkpoint through the engine's own runner (the same
DistributedInitializer + create_model_runner the encode worker will reuse) and
checks get_image_feature's contract on a synthetic image: output is
[post_merge_tokens, out_hidden] with post_merge = t*(h/m)*(w/m), and the
deepstack column split for deepstack models.

Not a CI unit test (needs a GPU + a multi-GB checkpoint); exits cleanly when
either is missing. Validated on /scratch/models/Qwen/Qwen3.5-2B: grid [1,4,6]
-> 24 patches -> 6 post-merge tokens -> get_image_feature [6, 2048] bf16
(ndeep=0). This confirms the ViT contract the encode executor's scatter relies
on, on real weights.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
Add the encode<->prefill embedding transport managers on top of the wire
protocol + sender:

- MooncakeEmbeddingManagerEncode (data source): registers to the bootstrap
  server, receives the receiver's registration + per-request pre-alloc frames,
  and a worker thread issues the one-sided mooncake write (reusing the engine's
  transfer_sync / batch_transfer_sync verbatim). Asserts the n_tokens/hidden/
  dtype contract before the unchecked write.
- MooncakeEmbeddingManagerPrefill (data sink) + MooncakeEmbeddingReceiver:
  discovers the encode endpoint via the bootstrap server, registers + pre-allocs
  its receive buffer, and consumes completion-status frames. TP=1 1:1 for now.

Direction is reversed vs prefill->decode: encode registers/listens, prefill
discovers/pushes its buffer info, encode writes. Lean adaptation of
prefill.py/decode.py/receiver.py without the KV/layerwise/mamba machinery.

Validated end to end (test/runtime/manual_validate_embedding_e2e.py): 2
processes, encode sends a known GPU pattern, prefill's GPU recv buffer gets the
exact bytes (sum match). Runs over mooncake's TCP fallback (no RDMA HCA on this
box); the register/transfer API is transport-transparent, so correctness
carries to RDMA. mooncake is an optional dep (pip mooncake-transfer-engine);
the validator skips when it or CUDA is absent.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
DisaggEncodeExecutor ties the encode worker together (option A, Python
orchestration): execute() groups items by modality, runs the vision tower once
per modality via the model's get_image_feature/get_video_feature, scatters the
output onto item.encoded (assign_encoded_embeddings), registers each source
buffer with the mooncake engine, and ships it through the per-request
MooncakeEmbeddingSender. register() wires a request to its prefill peer's
bootstrap (host, port, room); poll() delegates to the sender. The PD-event /
event-loop integration is task #4; this is driveable directly.

Validated end to end with real weights
(test/runtime/manual_validate_encode_e2e.py): the encode process loads
Qwen3.5-2B, runs the real ViT on a synthetic image, and ships the [6, 2048]
bf16 embedding; the prefill process loads NO model, sizes its recv buffer from
the grid + config hidden, and receives the embedding byte-for-byte identical
(bytes_match=True) over the mooncake TCP fallback. This exercises ViT + scatter
+ executor + transport + receiver together.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
EncodeWorker is the body the engine's encode event loop drives, sitting between
request arrival and the vision tower. submit() registers a request's transfer
peer and, per item, either resolves the embedding from the EmbeddingCache (skip
the tower, still ship via executor.send_item) or queues it on the
EncodeScheduler. step() runs one deterministic batch through the tower +
transfer and populates the cache. Batching and caching -- the two things the
encode role needs that the C++ KV scheduler does not provide -- live here in
plain Python.

The executor's send_item is now public (used for cache hits, which skip the
tower but must still transfer). The class is injected with executor / scheduler
/ cache so it is unit-tested with fakes (no GPU/transport): cache miss runs the
tower once and caches; a duplicate image hits the cache and ships without
re-running the tower; distinct items batch together; the token budget splits
batches; register fires per request.

Model load, mooncake manager construction, request transport and the event-loop
wiring are the engine integration; this orchestrator only sequences them.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
Add the per-item encode->prefill handshake field that the SMG grpc_servicer
populates for EPD requests: a list of {item_index, bootstrap_room,
bootstrap_host, bootstrap_port}, one per image. When set, the precomputed mm
inputs carry metadata only (no pixel feature); the prefill fills each item's
encoded tensor from the Mooncake transfer keyed by bootstrap_room instead of
running the vision tower. Threaded through __getitem__ so n>1 parallel
samples of a prompt share it (same as precomputed_multimodal_inputs).

This is the io_struct half of the EPD prefill receive glue (task #5); the
forward-side receiver hook lands next.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add receive_encoded_embeddings: the exact inverse of
assign_encoded_embeddings. For each handshaked image it sizes a receive
buffer to the item's post-merge token count x hidden (+ deepstack columns),
registers it with that item's encode worker via MooncakeEmbeddingReceiver,
waits for the push, and assigns onto item.encoded / item.encoded_deepstack --
the skip-ViT form the prefill VisionEmbedder already consumes (it never
re-encodes an item whose encoded is set). dtype is forwarded verbatim so it
matches the encode side's pre-write assert.

A receiver_factory seam keeps the buffer-sizing + assignment (the
silent-corruption-risk part) unit-testable on CPU with a fake receiver; 3
tests cover no-deepstack sizing, deepstack column width, and the
item_index->item mapping with out-of-order handshakes. The real byte transfer
stays covered by the manual_validate_encode_e2e GPU/Mooncake harness.

Wiring this into the real prefill forward (call site vs VisionEmbedder.apply)
and standing up MooncakeEmbeddingManagerPrefill is the event-loop layer
(task #4); this function is transport-agnostic via the factory seam.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Carry GenerateReqInput.encode_handshake along the same path the precomputed
mm items travel, so it reaches the forward index-aligned with the mm inputs:
- TokenizedGenerateReqInput.encode_handshake (set in InputProcessor next to
  multimodal_inputs).
- RequestState.encode_handshake (the per-request home, reachable by rid).
- MultimodalForwardContext.encode_handshakes: per-request list built in
  _get_multimodal_context_for_forward alongside mm_inputs (None when no
  request is EPD), each entry index-aligned with mm_inputs.

Steps 1-2 of the prefill receive integration (task #4). The manager init +
Path-4 receive hook follow.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Steps 3-4 of the prefill receive integration (task #4):
- EventLoop._maybe_init_embedding_manager: stand up a
  MooncakeEmbeddingManagerPrefill once per rank for a multimodal prefill
  node (dp_size=1/enable_dp_attention=False per the base manager's hard
  check; embedding_data_ptr=0, buffers allocated per image at receive). None
  for decode/encode/text-only nodes. Stored as self.embedding_manager,
  independent of pd_kv_transfer (a prefill is both the P->D KV source and the
  E->P embedding sink).
- EventLoop._maybe_receive_epd_embeddings called in _execute_forward_step
  Path 4 (prefill extend), after prepare_prefill and before the forward:
  fills item.encoded from Mooncake so the vision embedder skips the tower.
  Triple-guarded no-op for non-EPD / text-only. Synchronous (the async
  overlap needs the C++ FSM gate, task #8). Model args verified:
  model_runner.model.{config.hidden_size, num_deepstack_embeddings,
  visual.dtype}, model_executor.device.

KNOWN GAP (fail-loud guard added): the gateway splits per IMAGE (one
handshake/room per image) but the engine packs a whole request into ONE
MultimodalDataItem (images as multiple offsets). receive_encoded_embeddings
handles one handshake per item (single-image correct); a request with more
handshakes than items now raises NotImplementedError instead of mis-sizing.
Multi-image needs concatenating per-image embeddings into the item's encoded
by offset row-range (next).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Resolve the per-image (gateway) vs per-request-item (engine) mismatch via
Option A: keep ONE MultimodalDataItem per request and concatenate each
image's embedding into that item's encoded by offset row-range. Chosen over
splitting the servicer into N items because compute_mrope_positions
(mrope.py:61-65) last-write-wins over mm_items for image_grid_thw and assumes
one item carries the full [N,3] grid -- N per-image items would break mrope on
the OCR-critical path. Option A keeps mm_items byte-identical to the validated
non-EPD multi-subgrid shape, so mrope/pad/hash/scatter cannot regress; the only
new logic lives in this isolated, CPU-tested module.

receive_encoded_embeddings now: a running global-image cursor maps each item's
offsets to consecutive global image indices (single-item request -> item 0 owns
all images); per item it allocates one [total_tokens, hidden] buffer and
pre_allocs one receiver per image into its offset sub-range
recv_main[src_cursor:src_cursor+span] (mirroring the embedder's src_cursor),
then assigns item.encoded once after all images land. Single-image is N=1 of
this path (byte-identical; the 3 existing tests pass unchanged). The
NotImplementedError guard is gone.

3 new CPU tests: 3-image concat (sizes + contiguous sub-range pointers in
offsets order), deepstack sub-ranges, and offset-order placement under
out-of-order handshake arrival.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Extend manual_validate_encode_e2e: the prefill role now drives the REAL
receive_encoded_embeddings (the same call the Path-4 forward hook makes) with
a no-fixed-buffer manager (EmbeddingArgs ptr=0), instead of a bespoke
receiver. The glue allocates the per-item buffer, drives the receiver, and
assigns item.encoded.

Validated on GPU (8xB200): encode loads Qwen3.5-2B, runs the real vision
tower -> ships [6,2048] bf16 over Mooncake (TCP fallback, no RDMA HCA);
prefill's receive_encoded_embeddings receives byte-for-byte identical
(bytes_match=True). Resolves the open registration question: the glue's
internally-allocated buffer is not explicitly engine-registered and the
transfer still succeeds over TCP, so TCP needs no registration (RDMA note
kept). The full engine-side EPD prefill receive path is now end-to-end
validated against real Mooncake.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Make the vision-only ENCODE role launchable: run_event_loop branches to a new
lightweight run_encode_loop when disaggregation_mode == 'encode', instead of
building the full EventLoop (KV/LM scheduler is an impedance mismatch for a
ViT). run_encode_loop assembles the encode worker exactly like the GPU-
validated manual_validate_encode_e2e encode role (ModelConfig + distributed
init + create_model_runner -> model.visual via DisaggEncodeExecutor, Mooncake
encode manager + its own bootstrap server, EncodeScheduler + EmbeddingCache +
EncodeWorker), then drains EncodeRequests off the SAME scheduler-input ZMQ
channel the LM uses (recv_pyobj on port_args.scheduler_input_ipc_name) into
EncodeWorker.submit and runs step() to encode + ship over Mooncake. Sends the
'ready' envelope so the launcher unblocks.

This is the engine half of the encode launch path (part 1 of 3); the smg
grpc_servicer TokenSpeedEncoder handler that sends EncodeRequests over this
channel + the encode-mode launch (parts 2-3) and the full GPU/3-worker e2e
follow. Manager is dp_size=1/enable_dp_attention=False (TP=1 transport).
Vision-only model construction (skip the 8B LM load) is a follow-up; today it
loads the full model and uses its tower, like the validated harness.
Import-verified; structure mirrors the validated encode role.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Send an EPD EncodeRequest to the encode-worker scheduler subprocess over the
same scheduler-input ZMQ channel the LM uses (the encode subprocess runs
run_encode_loop). Fire-and-forget; the gateway gets only a gRPC ack, the
embeddings flow encode->prefill over Mooncake. CPU multimodal tensors pickle
over ZMQ directly (shm publishing is a follow-up). Used by the smg
grpc_servicer's TokenSpeedEncoder handler.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…sters

EPD finding A: the encode-side embedding transfer worker shipped as soon as
the ViT ran, with no gate that the prefill receiver had registered. When a
chunk was dequeued before the prefill's EmbeddingTransferInfo (pre_alloc)
arrived, transfer_infos[room] was empty, so _transfer_worker iterated nothing
and silently discarded the chunk (destructive popleft, no re-queue). The room
stayed Bootstrapped forever and the prefill timed out after 60s
("embedding receive timed out ... last status 2"), failing every image
request. Reproduced in a live 8-worker EPD OCRBench run (acc 0.000 vs 0.500
aggregated) and now in a forced-race harness.

Park-and-flush: when transfer_infos[room] is empty, _transfer_worker parks
the chunk (keyed by room, stamped with bootstrap_time_out) instead of dropping
it; the bootstrap thread flushes a room's parked chunks back onto the queue
once its EmbeddingTransferInfo arrives. A reaper fails rooms whose parked
chunk outlives bootstrap_time_out so a never-registering prefill cannot hang
the encode side. The happy path (info-before-pop) and the deepstack/multi-room
send paths are unchanged.

Tests: 4 unit tests (park->flush->Success, happy-path unchanged,
multi-room+deepstack isolation, never-registered->Failed) plus an EPD_FORCE_RACE
mode in the manual encode e2e harness that reproduces the drop on the old code
(60s timeout) and passes on the fix (bytes_match) over real ViT + Mooncake.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
In EPD the decode worker never runs the ViT or multimodal preprocessing, so its
request has no mm_input and mrope_position_delta is unset.
_build_mrope_positions_override then fell back to 1-D linear positions for every
decode token instead of the image-aware base+delta, so the decode query was
rope-rotated at the wrong position over the (correctly transferred) image KV ->
garbage from token-2 onward for image requests. Text was unaffected (delta=0).

Carry the per-request mrope_position_delta prefill->decode over the existing
Mooncake P->D bootstrap channel (sibling of bootstrap_token): stash at
on_first_token, ship as a TransferKVChunk field / 5th ZMQ frame (default 0,
backward compatible), and on the layerwise path route it through the
set_bootstrap_token aux_index handoff (the batch _decode->send path is skipped
for layerwise). The decode node synthesizes a minimal MultimodalInputs carrying
only the delta when delta != 0, and the mrope fallback gate is relaxed so a
delta-only mm_input takes the base+delta branch.

Validated: EPD with Qwen3.5-122B-A10B-NVFP4 (TP=1) reads OCRBench correctly
(EPD 0.88 vs aggregated 0.98); text and non-EPD paths byte-unchanged (delta=0).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
…robustness

The encode worker shipped item.encoded over Mooncake without synchronizing on
the ViT compute stream, so the RDMA read raced the tower kernels. Large images
(longer compute) shipped partially-written / garbage embeddings, which the
prefill consumed as a different image entirely -- silently cratering OCRBench
Doc-VQA (0.68 vs 0.91) and KIE (0.82 vs 0.93) while small images won the race
and matched the aggregated baseline. Synchronize the compute stream before the
send loop: EPD now matches aggregated (0.876 vs 0.877; Doc-VQA 0.925, KIE 0.94).

Also fixes the EPD startup races that made the cluster flaky to boot:
- encode _register_to_bootstrap did a single PUT and dropped the registration on
  the connection-refused race against the concurrently-starting bootstrap HTTP
  server, leaving /route parallel-info permanently null so every prefill
  handshake then failed (silent: the worker served no embeddings). Retry until
  the server accepts the registration.
- the receiver caches /route parallel-info that can come back with null fields
  before the encode worker finishes registering; re-fetch instead of caching the
  partial dict, which otherwise crashed every later request on int(None).

And makes the encode-side Mooncake registration size-aware (_ensure_registered):
the torch allocator reuses freed addresses at different sizes, so a reused
address must re-register when it grows or the transfer over-reads the stale MR.

Adds CPU unit tests for the size-aware registration.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
… sends

On a real RDMA NIC every transferred buffer must be a registered memory
region and Mooncake rejects OVERLAPPING registrations. Registering each
per-request `item.encoded` address fails: the torch caching allocator packs
freed-but-still-registered tensors together, so a later large image's grown
region straddles other registered addresses ("Transfer Engine does not
support overlapped memory region") -- the buffer ends unregistered, the
one-sided RDMA write fails, and the prefill scheduler dies.

Collapse every send through a fixed RING of pre-registered bounce buffers:
each slot is registered exactly once at a fixed size (registrations never
grow, never overlap), and `item.encoded` is copied into the next slot before
its async send. `execute` stages a whole batch in ring-sized groups and
syncs once per group, so a slot is never reused before its group's sync.
`ring_slots`/`ring_bytes` are injectable so the registration + staging path
is unit-testable on CPU without a multi-GiB allocation.

Signed-off-by: chenht2022 <chenht2022@gmail.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The encode->prefill embedding transport was hard-limited to TP=1
(`assert local_tp == encode_tp == 1`), which blocked EPD on models too
large to fit a prefill rank on one GPU (e.g. Qwen3.5-397B-A17B-NVFP4,
~234GB).

Lift the limit to any equal TP degree with a 1:1 rank mapping. The vision
tower's output is TP-gathered (full and identical on every encode rank),
so encode rank r ships its embedding to prefill rank r; the prefill
receiver now pairs with its own tp rank instead of a hard-coded rank 0.

Make the encode loop TP-collective-safe: the vision tower is TP-sharded,
so only rank 0 binds the bootstrap server and reads the gateway ZMQ, then
broadcasts each batch to the attn TP cpu group so all ranks run the
(collective) tower in lockstep. The TP=1 path is unchanged.

Validated e2e at TP=4 on the 2-node B200 cluster (E + P + D each TP=4,
Qwen3.5-397B-A17B-NVFP4, OCRBench): accuracy parity with the aggregated
engine and throughput matching/exceeding it.

Signed-off-by: chenht2022 <chenht2022@gmail.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Under chunked prefill, a multimodal request's prefill spans more than one
Path-4 forward, and `_maybe_receive_epd_embeddings` runs every forward
with no guard for items already received. The manager's per-room status is
monotonic (max(), never lowered except Failed) and the sync receive path
never clears it on success, so a re-receive of the same room (already at
Success) makes phase-1 `_wait` for Bootstrapped never match -> a 60s
`TimeoutError: embedding receive timed out ... last status 5, waiting for
{0, 2}`. This surfaced at conc=64 on the full OCRBench set (big images +
high concurrency push a single request's extend tokens past
chunked-prefill-size, forcing a multi-forward prefill); conc=8 and the
smaller subset never split, so they received once and passed.

Skip items whose `item.encoded` is already set (set only in phase 2 after
every image of the item reaches Success, so it reliably means fully
received). The guard sits after the global-image cursor advance so later
items still map to the right global indices, and mirrors `VisionEmbedder`,
which never re-encodes an item whose `encoded` is set.

Add CPU regression tests (re-call on the same item is a no-op; mixed batch
skips only the encoded item) and a fake Mooncake manager so the existing
receive tests exercise the buffer register/deregister path.

Signed-off-by: chenht2022 <chenht2022@gmail.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…cast)

The encode<->prefill embedding transport previously required
encode_tp == prefill_tp. Lift that to prefill_tp being any multiple of
encode_tp, so the encode worker can run at a smaller TP (e.g. TP=1, scaled
horizontally by running multiple encode instances) while prefill runs at
the TP its (large) LM needs.

The vision tower output is TP-gathered (full and identical on every encode
rank) and each prefill rank needs the full embedding, so this is a 1->N
broadcast: contiguous blocks of `fanout = prefill_tp // encode_tp` prefill
ranks pair with one encode rank (prefill rank r -> encode rank r // fanout,
mirroring the KV path receiver), and each prefill rank receives the full
embedding. For encode_tp=1 all prefill ranks pair encode rank 0.

The encode-side transfer worker now waits until all `fanout` receivers of a
room have registered before broadcasting (it previously sent to whoever was
registered and marked the room done, which under 1->N could ship to a subset
and strand late-registering ranks); it then sends to all, marks Success
once, and pops the room only after all N are served. The existing 1:1
(fanout=1) and TP=1 paths are unchanged.

Add a CPU unit test asserting the worker ships nothing until all N receivers
register, then broadcasts to all and completes.

Signed-off-by: chenht2022 <chenht2022@gmail.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ncode role)

The encode worker (disaggregation_mode=="encode") runs only the vision
tower, but it previously built the full model (vision tower + the entire
LM) and loaded all weights. That is fine at encode TP=4 (the LM shards
across GPUs) but makes encode TP=1 impossible for a large VLM: e.g.
Qwen3.5-397B-A17B-NVFP4 (~234GB) cannot be allocated on one GPU.

Add an orthogonal `vision_tower_only` axis (the mirror of
--language-model-only, aligned with kimi_k25's existing `encoder_only`
gate), auto-derived from disaggregation_mode=="encode": build the vision
tower but skip constructing the LM (self.model / lm_head / logits_processor)
and skip loading all non-visual weights. `is_multimodal_active` stays True
so the vision tower (and its cudagraph) are built. No runner/pool change is
needed -- the encode loop never builds a ModelExecutor, so the KV/mamba
pools were already not allocated for the encode role.

This unlocks encode TP=1 (one full ViT per GPU, scaled horizontally via data
parallel) for large models. --language-model-only, prefill/decode/agg, and
the symmetric encode TP=4 path are unchanged (the two axes are mutually
exclusive; model_config raises if both are set).

Validated on the B200 cluster: Qwen3.5-397B-A17B-NVFP4 encode TP=1 + prefill
TP=4 + decode TP=4. The encode worker loads on a single GPU with ~1.6GB
resident (vision tower only, not ~234GB), and OCRBench accuracy matches the
aggregated engine (~0.86), confirming the vision-only embeddings are correct.

Signed-off-by: chenht2022 <chenht2022@gmail.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Remove debug instrumentation, dead branches, unused symbols, and one-off
validation scripts left over from building EPD. No behavior change to the
working feature; verified by py_compile and the 51 CPU unit tests
(test_embedding_transfer / encode_receiver / encode_worker / encode_scheduler
/ encode_executor).

- Drop the EPD_TIMING perf-investigation probes (encode_executor, encode_loop).
- Remove the orphaned AsyncLLM.submit_encode (superseded by run_encode_loop).
- Delete the never-false EmbeddingChunk.is_last path and the never-true
  EmbeddingTransferInfo.is_dummy path (12->11 ZMQ frames; tests updated).
- Drop the receiver's unread conclude_state/init_time and the now-unused
  DisaggEncodeExecutor.poll (the sender's timeout mirror is kept on purpose).
- Remove the inert C++ DisaggregationMode::kEncode enum + its nanobind binding;
  the runtime selects the encode role via the Python pd.utils enum. Re-add when
  the C++ kEncode FSM is actually built.
- Delete the three manual_validate_* GPU harness scripts; the contracts they
  exercised are covered by the committed CPU unit tests.
- Refresh stale "task #4"/"task #8" scaffolding comments.

Signed-off-by: chenht2022 <chenht2022@gmail.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…e race fix

On the decode node, let a scheduler cycle that triggers a KV receive ALSO run a
pure-decode forward for already-decoding requests, instead of wasting the cycle
(legacy: receive cycles emitted zero decode tokens, starving decode to batch ~5).
The combined [extends | decodes] op is split: extend rows drive the RDMA receive,
the decode_only() projection (num_extends==0) drives the model forward, so the
model never sees a mixed op. The swap happens before _dp_sync_and_check, so the
DP-advertised token vector matches the forwarded tokens (no NCCL size mismatch).

Residual-crash fix: the in-cycle decode forward could consume the KV-receive
setup state (update_block_table on the COMBINED op / reset_valid_cache_length /
receive registration) before it was committed across the default and execution
streams, causing a nondeterministic CUDA illegal memory access under sustained
load (it surfaced async at the next sync, the mrope delta .to(); masked by
CUDA_LAUNCH_BLOCKING=1, the signature of a stream race rather than a logic/OOB
bug). Fence with torch.cuda.synchronize() in _kd_split_receive_and_decode after
the receive is triggered, before the decode-only forward consumes it. Fires only
on receive cycles (num_extends>0), so steady-state pure-decode cycles keep full
overlap.

Validated cross-node b200-80(prefill) <-> b200-77(encode+decode), TP=4,
Qwen3.5-397B-A17B-NVFP4, OCRBench full-1000 @ c64: 0 crashes, 0 FailedEvents,
acc 0.900 (== agg), decode batch grows to ~43-53, peak gen throughput ~3.5-4k
tok/s (vs starved ~590).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Replace the synchronous busy-wait embedding receive (which blocked the prefill
event loop ~55% of wall, GPU util 7.9%) with a poll-driven admission gate.

- encode_receiver.py: EmbeddingReceiveJob (start/poll/release); per-image
  receivers preserved (a request's images may span multiple encode workers);
  item.encoded published only when every image reaches Success.
- event_loop.py: stage EPD requests out of the scheduler until their embeddings
  arrive; _drain_ready_epd_embeddings polls jobs each cycle and admits via a
  TP-rank-synced MIN all-reduce (FAILED<PENDING<DONE) over attn_tp_cpu_group so
  all ranks admit/abort together; P->D sender registration deferred to admission
  (registering pre-submit makes generate_events' requests_.at(rid) throw).

Cross-node TP=4 397B validated. Known issue: a multi-encode-worker COLD start
hit by a concurrent flood can deadlock (encode loops block on the Mooncake send);
mitigated operationally by sequential warmup before high-concurrency traffic.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
chenht2022 and others added 18 commits June 15, 2026 19:28
Single-request lifecycle timing across encode/transfer/prefill/KV/decode, all
gated on the EPD_TL env var (zero overhead when unset). Used to locate the
coordination bottlenecks; kept for ongoing profiling (incl. the cold-start
deadlock investigation).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The encode->prefill embedding manager hardcoded bootstrap_time_out = 30s (the wait
for the prefill receiver to register before the encode sends). The PD KV path reads
TOKENSPEED_DISAGGREGATION_BOOTSTRAP_TIMEOUT (default 120s, prefill.py:117) for the
analogous prefill->decode bootstrap. Reuse that knob so the EPD bootstrap wait
matches PD instead of an arbitrary 30s (1 operator knob for both registration waits;
pairs with 1cf6199 which aligned the transfer-wait to WAITING_TIMEOUT=300s).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A ring slot was reused after ring_slots more stages with no check that its
previous occupant's Mooncake send had concluded. The "ViT cadence covers
the prior write" assumption breaks once sends queue or park: a parked chunk
(late receiver registration) holds the slot's pointer for up to
bootstrap_time_out and is re-sent on flush, so a fast producer could
overwrite an in-flight embedding -- the same silent-corruption class as the
ViT->send race.

Track the room last staged into each slot and block reuse until that room's
transfer is terminal (Success/Failed/reaped) and unparked, failing loud past
the parking deadline instead of corrupting. A single dict probe per image in
the steady state. Cross-node OCRBench unchanged (0.894 vs 0.886-0.906 band).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
Under EPD_PIXEL_SHM the encoder servicer ships each image's pixels to this
process as a POSIX-SHM handle instead of pickling the raw 19-77MB tensor
over the scheduler ZMQ socket (at 4K the pickle + recv copies alone were
~480-790ms per image, the dominant ingest cost after the NIXL pixel path).

At the cache fork in submit(): a miss materializes the handle into pinned
memory (consume() also unlinks, so segments never outlive the item -- and
pinned gives the ViT H2D a real non_blocking copy); a hit never needs the
pixels, so the segment is just consumed-and-dropped to unlink (a copy-free
release() can replace this once the deferred-consume lifecycle API lands).

Cross-node 2x4K (E4): 7.85 -> 12.02 req/s with the servicer side enabled;
8x1080p (E8) 10.63 -> 14.03 req/s; /dev/shm steady (no leak); OCRBench
accuracy unchanged (0.888, band 0.886-0.906).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
…tail

Deregistering an embedding receive MR the instant its Success notif arrives
races the responder HCA's DMA placement pipeline: the encode side's
transfer_sync CQE guarantees the transport ACK, not local placement, and
under load (>10 req/s, deep PCIe/NIC queues) the tail stretches to
milliseconds. The immediate dereg then lands mid-placement and the QP dies
with 'local access violation work queue error' (~2-3% of images), forcing a
Mooncake session re-establish and a visible throughput dip; the same race on
the _fail() path can hit a SIBLING image's in-flight write into the shared
item buffer.

Deregistration now goes through a lazy queue with a grace period
(EPD_RECV_DEREG_DELAY_S, default 0.5s). Entries hold the tensor ref, so the
allocator cannot reuse a still-registered address (the no-double-register
invariant is preserved in a stronger form). Sweeps piggyback on job
construction and poll(), both on the scheduler loop: no new thread, no lock,
and the dereg cost also moves off the completion hot path.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
Per-request register/deregister of E->P embedding receive buffers is the
mode-2 root cause: a recycled allocator address re-registers under a NEW
rkey, but the encode side's Mooncake segment cache keeps resolving that
range to the OLD rkey until a transfer fails, so every reuse risks a
'local access violation work queue error' that kills the QP and rides the
error-retry path (903 violations / cold cc108 run; p99 TTFT +50%). Bisected
by deregistration delay: 0.5s/5s/never -> 903/512/0 violations.

Receive targets now lease slots from a pool registered ONCE for the
engine's lifetime (EPD_RECV_POOL_SLOTS x EPD_RECV_POOL_SLOT_MB, default
16x256MB): the sender's cached rkey can never go stale, and the
per-request register/deregister GIL cost leaves the scheduler hot path.
DONE clones the landed rows onto item.encoded (~0.05ms) and returns the
slot; FAILED slots sit in quarantine (EPD_RECV_POOL_QUARANTINE_S, 10s) --
under a lifetime MR a still-in-flight write would otherwise land silently
in the next tenant's data. Deepstack items, oversized items, exhaustion,
or EPD_RECV_POOL_SLOTS=0 fall back to the legacy per-request path (kept
with its lazy deregistration).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
Re-adds the method ecf2eff introduced and c297c26 removed as
"experimental/dead" code: its only caller is the smg grpc servicer's
TokenSpeedEncoder handler (encoder_servicer._ingest), which lives in the
gateway repo, so an engine-side dead-code sweep cannot see it. Without
this the offloop pixel-ingest servicer crashes on every Encode RPC
(AttributeError) and EPD serves no embeddings; the method has been
running as uncommitted drift on the cluster deployments since.

Fire-and-forget over the scheduler-input channel: the encode subprocess
runs run_encode_loop (not the LM EventLoop) and ships the resulting
embeddings to the prefill over Mooncake.

Signed-off-by: chenht2022 <chenht2022@gmail.com>
… models

EPD_TOLERANT_LOAD=1 skips checkpoint expert weights whose target layer
does not exist in the (truncated) model instead of raising, so
reduced-layer capacity-probe builds (the 6L/4L light models used for
EPD pipeline stress benches) can load from a full checkpoint. Default
off = the strict unloaded-match/unmatched checks are unchanged. Has
been running as uncommitted drift on the cluster deployments.

Signed-off-by: chenht2022 <chenht2022@gmail.com>
…t encode sender

Eliminates the E->P wire fanout: with prefill attn-TP=M the encode worker
previously RDMA-wrote the FULL per-image embedding to every prefill rank
(M x amplification, 536MB/req at M=4 for 8x1080p) through a single
synchronous sender thread.

O1, concurrent sender: K room-sharded queues (room % K keeps one consumer
per room, preserving the park/flush/straggler-drop ordering; the PD
dst-session-port hash would degenerate to one queue here) + a per-queue
pool issuing one room's per-receiver writes concurrently (each receiver is
a distinct Mooncake session; one batch call cannot span them). Status
pushes move to thread-local ZMQ sockets (the @cache-shared socket is not
thread-safe). On any failure every receiver now gets a Failed push. The
encode worker sweeps concluded senders (request_status tombstones stay:
the straggler-drop and the ring lease key on them).

O2, row sharding: each prefill rank pre_allocs only its shard_rows()
sub-range PER IMAGE (per-item sharding would broadcast rows a rank never
received), with dst pointers offset to the shard's first row. The wire
n_tokens now carries the shard's row count: a pre-shard encode fails its
n_tokens == chunk.n_tokens check loud instead of writing a full span at a
shard-offset pointer. Capability is negotiated via the bootstrap
parallel-info (embedding_shard_version); receivers fall back to identity
frames for non-advertising pools, and zero-row shards still send their
pre_alloc as the registration heartbeat the encode fanout gate counts.
The encode side validates the fanout set tiles contiguously before any
write, and slices the staged ring chunk per frame (cache-hit path
included, no executor change). Reassembly = per-image sub-range NCCL
broadcasts in the admission drain's rank-agreed DONE branch (after the
gloo MIN, before the P->D register/submit), src = GLOBAL rank from
mapping.attn.tp_group; the communicator is warmed at startup. The
collective relies on the prefill's non-overlap loop for cross-rank launch
-order consistency. The blocking receive backstop fails loud under the
shard flag (it would publish shard-only rows).

Gated by TOKENSPEED_EPD_EMBEDDING_SHARD (default off = byte-identical
behavior). Encode egress drops M x (536->134MB/req at M=4); per-rank
ingress drops to 1/M; the NVLink reassembly is ~100MB/req intra-node.

Signed-off-by: chenht2022 <chenht2022@gmail.com>
…its main half

Adversarial-review fixes on the row-shard commit:

1. validate_fanout_frames lost the old per-frame n_tokens == chunk.n_tokens
   guard: a lone under-span frame (fanout==1, the symmetric encode/prefill
   pairing) validated and silently truncated the transfer where the old
   worker failed the room loud. The v2 frame now also carries the image's
   FULL span (appended, length-guarded like row_start); the encode requires
   span == chunk.n_tokens per frame, and legacy span-less frames (pre-shard
   prefills, which only ever describe the full image) get the old strict
   equality back. Loud G2 contract restored for every fanout geometry.

2. EmbeddingCache cached only item.encoded: a hit on a deepstack model
   shipped without the deepstack half while the prefill published its
   never-written deepstack buffer (pre-existing, surfaced by the review).
   The cache now stores the (main, deepstack) pair and a hit restores both;
   validate_fanout_frames additionally rejects deepstack-presence mismatch
   in both directions as defense in depth.

3. TOKENSPEED_EPD_EMBEDDING_SHARD torn across attn-TP ranks now fails loud
   at boot (gloo agreement check) instead of hanging the warmup broadcast.

4. The sharded-mode backstop only requires items that OWN a handshaked
   image to be encoded (the receive job deliberately leaves un-handshaked
   items to the vision tower; the backstop now uses the same global-image
   cursor mapping).

5. The poison-pill test now exercises a replica of the pre-shard encode's
   equality predicate over a shard grid instead of asserting its own
   fixture; new tests cover the single-frame span mismatch (v2 + legacy)
   and the deepstack mismatch/cache-hit paths.

Signed-off-by: chenht2022 <chenht2022@gmail.com>
Flips TOKENSPEED_EPD_EMBEDDING_SHARD to default-true after the full
validation matrix passed cross-node (b200-80 prefill <-> b200-77
encode/decode, 397B-NVFP4, greedy):

- TP4 and TP8 prefill: OCRBench accuracy parity in every phase pair
  (TP8: 0.875 == 0.875), zero embedding timeouts, zero contract
  failures, multi-image greedy probe byte-stable.
- Heavy-vision soak (8x1080p, np400, conc48, TP8): 400/400 in both
  modes; encode-node egress 511.9 GB -> 72.3 GB, i.e. exactly 8.0x on
  the embedding component (the residual is decode token backhaul).
- Fault drill (kill -9 one of four encode workers mid-soak): zero
  prefill exceptions or NCCL stalls (aborts resolve BEFORE the drain
  collective by the rank-agreed MIN), bounded request loss (2 requests
  in flight on the dead worker), gateway healthy, remaining workers
  kept serving (post-drill cell acc 0.950).

The flag stays as the kill-switch (=0 restores full-copy wire
behavior); at attn-TP 1 sharding is inert, and pools that do not
advertise shard support still negotiate down to identity frames.

Signed-off-by: chenht2022 <chenht2022@gmail.com>
…encode

The encode path's grid lookup was the lone outlier still using the
'grid_thws' model_specific key; mrope, the encoder servicer, and the
Qwen path all use 'image_grid_thw'. With the smg processor now emitting
'image_grid_thw' uniformly, align this read. Fixes 'MultimodalDataItem
has no attribute grid_thws' on K2.5 EPD encode.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
… aggregated)

The EPD path carries the grid under image_grid_thw (smg processor) while
the aggregated path carries grid_thws (engine preprocessing). Reading only
image_grid_thw crashed the aggregated worker. Accept either key.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…releasing batch transfer

Resolves the Kimi-K2.5 EPD concurrency cliff (conc8 partial / conc16+ all-503
deadlock with no recovery). The old _wait_slot_reusable busy-waited on a fixed
ring slot inside the single-threaded encode step(); when a slot's room concluded
slowly (large ~308MB embedding, slow cross-process receiver registration) the
loop stopped draining ZMQ and answering gRPC pings -> Code::Unavailable -> 503,
and never recovered.

- _lease_slot(): non-blocking scan for ANY reusable slot (room TERMINAL + not
  parked), returns None when full instead of spinning. Same overwrite-safety
  invariant, minus the busy-wait.
- _deferred_sends + drain_deferred(): a full ring DEFERS the send and retries
  each tick; the loop keeps draining ZMQ / answering pings.
- encode_worker.step: backpressure (skip next_batch while sends are deferred) so
  the ring can't accumulate an unbounded backlog of unshippable embeddings (OOM).
- encode_loop: yield the GIL while has_deferred so the transfer daemons free slots.
- _send: always use batch_transfer_sync (releases the GIL for the one-sided write)
  even for the single-buffer no-deepstack (Kimi) path.
- ring depth / slot bytes env-tunable (TOKENSPEED_EPD_ENCODE_RING_SLOTS/_SLOT_MB).

Validated cross-node TP4 (Kimi-K2.5-NVFP4): conc 4/8/16/32 = 20/20, 40/40, 80/80,
160/160; recovers post-burst; OCRBench 0.840 (no regression, no NaN). 17 unit tests.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
_route_get() called requests.get(timeout=5) with no try/except, so it
returned None only for a non-200 response. A genuinely unreachable or
non-responding bootstrap/encode endpoint (worker restarting, slow to
register, a 5s timeout firing under burst, a network blip) RAISES
ConnectionError/Timeout instead. That exception escaped the receiver
__init__ -> EmbeddingReceiveJob.start -> _process_new_requests on the
prefill scheduler thread, hit the event-loop top-level handler, and
SIGUSR1'd the whole prefill worker -- taking the entire TP group and
every in-flight request with it. Exposure scales with the encode pool
size: any one of n endpoints hiccuping can drop a prefill worker.

The receiver's two call sites already handle None (record_failure ->
KVPoll.Failed -> rank-agreed abort via the drain MIN all-reduce); only
the exception path was unhandled. Wrap the fetch in try/except -> None,
mirroring the KV path's _get_bootstrap_info_from_server (receiver.py),
so a dead endpoint fails the room cleanly instead of the engine.

Tests: 3 cases for _route_get (transport exception, non-200, 200-json).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
A per-item contract violation in the tower step raised straight out of the
encode loop into the engine's top-level handler, which SIGUSR1s the whole
encode worker: assign_encoded_embeddings raises ValueError when the
vision-tower output rows do not match the items' post-merge token count (a
grid/token-count contract drift -- exactly the class of K2.5 bring-up bug),
and _copy_into raises RuntimeError when one embedding exceeds a ring slot
(reachable on K2.5 once slots are tuned smaller for concurrency, or on a
video / multi-image item). Because the gateway round-robins images across
encode workers, one bad image took out every other in-flight request on that
worker, and with a fixed encode pool there is no failover.

Wrap executor.execute() in step(): on any exception, conclude every room in
the batch Failed via the manager's existing _fail_room (the receiver learns
through the rank-synced admission abort), drain the batch from _pending so the
loop never re-runs the bad item, log once, and return 0 so the loop keeps
serving other workers' images. Both raises fire before any send is issued
(ViT/assign, or ring staging which precedes the post-stage sync+send), so
failing the whole scheduler batch never poisons an already-shipped room.
Mirrors the per-room failure path already used on the encode manager's fanout
validation. Process death stays reserved for genuinely unrecoverable states.

+3 unit tests (ValueError concludes its room Failed without crashing; a failed
batch fails both its rooms and the loop still serves a later healthy batch; a
multi-image request fails its shared room exactly once). Full
test_encode_worker.py green (20 passed).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
Two ways a single transient P->D fault permanently wedged a prefill rank,
with no node actually down:

(c) The transfer_worker loop's catch-all did `raise RuntimeError(...)`, which
killed the daemon thread. Its FastQueue shard then had no consumer, so every
room routed to that shard never reached Success: the sender polled forever, the
KV pages / receive state stayed held, the pool drained, and the prefill stopped
admitting new requests. One poison chunk or one unexpected bookkeeping error
took out a whole queue shard. Route the fault through _fail_transfer_chunk:
conclude THIS chunk's room Failed (its sender's poll() returns Failed and the
scheduler aborts the request), best-effort push Failed to each decode peer so
it does not wait out its own receive timeout, drop the room's transfer_infos,
and keep draining. Mirrors the inline send-failure path right above it; process
death is reserved for genuinely unrecoverable states. kv_chunk is bound to None
before the try so a fault at queue.get() itself is a quiet no-op.

(a) MooncakeKVSender.init_time was set to None in __init__ and never assigned
anywhere, so poll()'s `if self.init_time is not None` Bootstrapping-timeout
branch was dead code: a decode peer that never registered kept the room
Bootstrapping forever, the request never concluded, and it held its pool slot
indefinitely. Set init_time at registration (the status goes Bootstrapping in
__init__), so the existing timeout fires after bootstrap_time_out and concludes
the room Failed.

Out of scope (per the EPD fault-tolerance cut): adding a prefill->decode
heartbeat so prefill detects a decode that died mid-transfer -- that is
failover, not transient-fault survival.

+6 unit tests: sender sets init_time at registration / its bootstrap timeout
now fires / a Success still concludes without a spurious timeout;
_fail_transfer_chunk concludes the room + notifies decode / no-ops on a
pre-dequeue fault / survives a failing decode-sync. Adjacent PD tests green
(47 passed: embedding_transfer + encode_worker + the new file).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
…vents

The rebase onto origin/main correctly dropped EPD's -1 bootstrap_token retry guard (subsumed by upstream's store-token-before-flipping-Success atomicity), but over-deleted the adjacent `bootstrap_room = self.receivers[req_id].bootstrap_room` binding, which is NOT part of the guard: it is the loop-local still read by pop_prefill_metadata(bootstrap_room) and pop_mrope_delta(bootstrap_room) in the same Bootstrapped+Success branch.

Without it, generate_events() raises NameError on the first completed prefill->decode transfer of every EPD decode run, wedging the decode scheduler. Invisible to build / py_compile / golden tree-equality (runtime-only). Restore the single binding (matches origin/main).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
@chenht2022 chenht2022 force-pushed the hongtaoc/epd-encode branch from cfb6071 to 0aa7572 Compare June 16, 2026 10:21
chenht2022 and others added 2 commits June 17, 2026 05:55
…AM L2

P2 of the EPD perf/cache roadmap. The encode worker's vision-embedding
cache (content-hash -> ViT output, skip the tower on duplicate images)
was a single VRAM tier hardcoded at 4 GiB. This makes that capacity
env-configurable and adds an optional second tier in host DRAM, default
OFF so the steady-state path is unchanged.

Part 1 (L1 capacity env): replace the hardcoded 4<<30 with
TOKENSPEED_EPD_ENCODE_EMBED_CACHE_MB (whole MiB, mirroring the existing
TOKENSPEED_EPD_ENCODE_RING_SLOT_MB convention). Default 4096 MiB ==
the prior 4 GiB exactly.

Part 2 (host-DRAM L2, opt-in): TieredEmbeddingCache layers a host-DRAM
L2 under the VRAM L1 so duplicate images skip the tower even past the
VRAM working set (~150 Kimi-K2.5 images at 4 GiB). Tiers are exclusive
(a key lives in exactly one): an L1 eviction demotes the victim to L2
(device->host copy), an L2 hit promotes it back to L1 (host->device) and
drops the L2 copy, and put() lands in L1 and clears any stale L2 dup. An
L2 hit must land on GPU because the send path stages item.encoded into
the GPU RDMA ring. This mirrors HiCache's L1(GPU)/L2(host) layering
minus the distributed L3: content-hash routing pins each image to one
worker, so a per-worker local cache already captures the reuse.

Reuse the existing bytes-bounded EmbeddingCache for both tiers via a new
on_evict hook (fires only on capacity overflow, not on update/pop) and a
pop() that takes ownership without hit/miss/evict side effects. The
device<->host copies are injectable so the tiering is unit-testable
without a GPU; the defaults walk a (main, deepstack|None) tuple or a
bare legacy tensor. Copies run synchronously on the single-threaded
encode loop (first-correct; skipping the ViT is the win, the ~28MB copy
is cheap by comparison; pinned/RDMA-direct L2 is a follow-up).

Wiring: TOKENSPEED_EPD_ENCODE_EMBED_CACHE_DRAM_MB defaults to 0
(disabled) -> _make_embedding_cache builds a plain single-tier
EmbeddingCache, leaving the default path behaviorally identical to
before. > 0 builds the tiered cache and logs the effective per-tier
capacities at startup. Both knobs are per-encode-process (per TP rank);
co-located ranks each hold a full L1+L2, so budget host DRAM as
tp_size * DRAM_MB.

_embedding_cache_bytes rejects negative / non-integer values with an
env-named error so a mis-set knob fails fast and legibly rather than as
an opaque int()/capacity crash deep in construction.

Tests (52 green): EmbeddingCache on_evict/pop; TieredEmbeddingCache
demote/promote/exclusivity/counters/l2-disabled/oversized/stale-dup
(torch-free via injected copy stubs); env parsing (unset/empty/0/
negative/non-int) and the L1-only-vs-tiered selection branch; and
through the real consumer (EncodeWorker against a tiered cache: demote
then L2-hit skips the tower) plus the default torch copy helpers on real
tensors.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant