Skip to content

feat(rl): weight-transfer control plane for RL online weight sync#342

Draft
qywu wants to merge 16 commits into
mainfrom
qywu/weights_sync_api
Draft

feat(rl): weight-transfer control plane for RL online weight sync#342
qywu wants to merge 16 commits into
mainfrom
qywu/weights_sync_api

Conversation

@qywu

@qywu qywu commented Jun 2, 2026

Copy link
Copy Markdown
Collaborator

Summary

Adds an HTTP control plane for RL online weight sync — updating model weights
in place during serving, without restarting the engine. The endpoint surface
(paths, methods, request/response JSON, call lifecycle) follows the
weight-transfer contract that common RL training frameworks (verl / slime /
AReaL / miles) already speak, so existing trainer code drives TokenSpeed
unchanged. Heavy weight payloads travel out-of-band (NCCL broadcast / CUDA-IPC);
only metadata flows over HTTP.

Off by default — gated behind --enable-weight-transfer (or
TOKENSPEED_SERVER_DEV_MODE=1).

Draft: the control plane, lifecycle, and tests are complete and green. The
worker-side weight load and a couple of deeper items are deferred (see
below) — they need GPU hardware / C++ scheduler changes / the external smg
servicer to implement and verify.

Endpoints

POST /init_weight_transfer_engine, /start_weight_update, /update_weights,
/finish_weight_update, /pause, /resume; GET /get_world_size,
/is_paused. Full reference + lifecycle in docs/serving/weight-transfer.md;
trainer-side client snippets in examples/rl/.

What's included

  • Config + gatingWeightTransferConfig, --enable-weight-transfer,
    --weight-transfer-config, --weight-transfer-port,
    TOKENSPEED_SERVER_DEV_MODE.
  • WeightTransferManager — lifecycle state machine (init → start → update →
    finish), ordering enforcement, get_world_size(include_dp), and pause/resume
    (abort / wait / keep) via a generation admission gate + the existing
    RWLock.
  • io_struct — reconcile UpdateWeightsFromDistributedReqInput fields
    (names / dtype_names / shapes / packed*) and fix a latent
    singular/plural drift
    with engine.py that would have crashed if called;
    add Start / FinishWeightUpdate structs.
  • Entrypoints — engine-embedded FastAPI control app (8 endpoints) launched on
    AsyncLLM's loop; the ts serve sidecar proxies the endpoints on the public
    port.
  • mappingworld_size_across_dp / world_size_per_dp helpers.
  • Tests / docs / examples — contract, state-machine, get_world_size
    matrix, backend parsing, sidecar + full e2e (real uvicorn + httpx, including
    pause actually blocking a live generation on the same loop);
    docs/serving/weight-transfer.md; examples/rl/.

Tests

pytest test/runtime/weight_transfer/83 passed, 1 skipped (skip = opt-in
live-server test gated on TOKENSPEED_E2E_URL). pre-commit run --all-files
clean. No regressions in the existing suites for touched files.

Deferred (need GPU / C++ scheduler / external smg servicer)

  • Worker-side weight load — the scheduler/model-runner NCCL/IPC receive +
    load_weights. nccl update drives the existing distributed transport;
    ipc update validates the payload then errors (worker path deferred).

Error model: lifecycle endpoints (init/start/update/finish) leave
manager errors unwrapped → 500; only a missing required field or invalid JSON is
a 400. pause catches ValueError → 400 (and rejects an unknown mode with
400), otherwise 500; resume → 500 on error. update_kind="dense" is accepted
and ignored; update_kind="sparse_flat" is rejected (sparse not implemented).

  • keep-mode KV preservation in the C++ scheduler. Frontend admission
    gating works for all three modes; abort / wait are fully functional.
  • Native gRPC transport (cross-repo smg change). The engine-embedded app +
    sidecar proxy carries the control plane today.
  • Startup launch hook — the in-engine app comes up best-effort on AsyncLLM's
    loop; for guaranteed availability before the very first request, the engine
    bootstrap should call the launcher explicitly.

qywu added 16 commits June 2, 2026 06:11
Expose HTTP weight-transfer endpoints for RL online serving so common trainer
code (verl/slime/AReaL/miles) drives tokenspeed unchanged. The HTTP surface
follows the weight-transfer contract those trainers speak (paths, methods,
request/response JSON, lifecycle); heavy payloads stay out-of-band
(NCCL/CUDA-IPC), only metadata flows through HTTP.

- config+gating: WeightTransferConfig, --enable-weight-transfer,
  --weight-transfer-config, --weight-transfer-port, TOKENSPEED_SERVER_DEV_MODE
  (off by default).
- WeightTransferManager: lifecycle state machine (init/start/update/finish),
  ordering enforcement, get_world_size(include_dp), pause/resume via an
  admission gate + RWLock, NCCL/IPC init/update parsing (incl. the
  insecure-serialization opt-in).
- io_struct: reconcile UpdateWeightsFromDistributedReqInput fields
  (names/dtype_names/shapes/packed*) and fix the singular/plural drift with
  engine.py; add Start/FinishWeightUpdate structs.
- entrypoints: engine-embedded FastAPI control app (8 endpoints) launched on
  AsyncLLM's loop; sidecar proxies the endpoints on the public port.
- mapping: world_size_across_dp / world_size_per_dp helpers.
- tests/docs/examples: contract + state-machine + get_world_size matrix +
  backend parsing + sidecar e2e; docs/serving/weight-transfer.md; examples/rl/.

Deferred (require GPU hardware, C++ scheduler, or the external smg servicer to
implement and verify): worker-side NCCL/IPC weight load, keep-mode KV
preservation, and native gRPC transport. nccl update drives the existing
distributed transport; ipc update validates then returns 501.

Signed-off-by: Qingyang Wu <willqywu@gmail.com>
Drive the real WeightTransferManager through the real FastAPI app over a real
socket (uvicorn + httpx), covering the full weight-update lifecycle, the error
contract (409/400/501), and -- most importantly -- the pause/resume admission
gate actually blocking a concurrent generation on the same event loop and
unblocking on resume.

AsyncLLM's heavy GPU imports are stubbed by GateBackedStub, which reproduces
AsyncLLM's admission-gate methods 1:1 using the real RWLock. Also adds an opt-in
test_live_server_lifecycle gated on TOKENSPEED_E2E_URL to drive a real
ts-serve --enable-weight-transfer (backend-independent surface).

Signed-off-by: Qingyang Wu <willqywu@gmail.com>
…I directly

Reword inline comments, docstrings, CLI help, docs, and examples for the
weight-transfer control plane to describe the endpoints/lifecycle/config
directly, and drop the optional installed-router parity test. No functional
changes; the weight-transfer suite still passes (83 passed, 1 skipped).

Signed-off-by: Qingyang Wu <willqywu@gmail.com>
…d=dense

- HTTP: lifecycle endpoints (init/start/update/finish) no longer wrap manager
  errors in custom 409/501/400 codes; they surface as 500, and only a missing
  required field or invalid JSON stays a 400. pause/resume catch ValueError ->
  400 and otherwise return 500; an invalid pause mode stays a 400.
- manager: accept-and-ignore update_kind="dense" (and num_updates_list) in the
  nccl/ipc update_info; reject update_kind="sparse_flat" with a clear error
  (sparse weight updates are not implemented).
- tests/docs updated accordingly. Happy path unchanged.

Signed-off-by: Qingyang Wu <willqywu@gmail.com>
…y default

- Add a SGLang-dialect weight-sync router (init_weights_update_group,
  update_weights_from_distributed [dtypes->dtype_names] / _from_tensor /
  _from_disk, pause_generation/continue_generation, GET flush_cache,
  release/resume_memory_occupation, abort_request, health_generate),
  forwarding to AsyncLLM's existing methods. Covers slime/miles and verl's
  SGLang rollout.
- Serve it on the SAME in-engine control app/port as the native endpoints
  (one app, one URL); the sidecar proxies both to engine_http_url.
- Default the RL control plane ON (opt out with --no-enable-weight-transfer);
  drop the TOKENSPEED_SERVER_DEV_MODE gate. WARNING: these endpoints overwrite
  weights, reload checkpoints from disk, and pause/abort serving, and are
  exposed on the control port.
- Tests (test_sglang_compat.py + updated config/sidecar) + docs + examples.

Worker-side NCCL/IPC receive+load remains the deferred GPU piece (both dialects).

Signed-off-by: Qingyang Wu <willqywu@gmail.com>
…trol_port

- Remove the --enable-weight-transfer gate entirely: the RL weight-sync
  control plane (native + SGLang-compatible endpoints) is always on. Drops
  enable_weight_transfer / weight_transfer_enabled / TOKENSPEED_SERVER_DEV_MODE.
  WARNING: the endpoints (overwrite weights, reload checkpoints from disk,
  pause/abort) are always exposed on the control port; on untrusted networks
  restrict it with network controls / an auth proxy.
- Rename weight_transfer_port -> rl_control_port (--rl-control-port), and the
  coupled internals (_add_rl_control_port, _maybe_launch_rl_control_plane,
  _serve_rl_control_plane, _rl_control_task): the port hosts the whole RL
  control surface (weight sync + pause/resume + memory occupation), not just
  weight transfer. Distinct from the sidecar's --control-port.
- Update tests/docs/examples accordingly.

Signed-off-by: Qingyang Wu <willqywu@gmail.com>
Keep this branch (the PR) to the runtime code. The RL weight-sync docs
(docs/serving/weight-transfer.md + the VitePress nav entry), examples
(examples/rl/), and tests (test/runtime/weight_transfer/) are preserved on the
qywu/weight_sync_dev branch (snapshot of this branch before the split).

Signed-off-by: Qingyang Wu <willqywu@gmail.com>
TOKENSPEED_ALLOW_INSECURE_SERIALIZATION guarded the IPC tensor path, but that
path raises NotImplementedError before any deserialization happens, so the gate
protected nothing live. Remove it and leave a SECURITY note at the deferred
worker-side receive site (prefer structured ipc_handles; never pickle.loads
caller-supplied bytes off rl_control_port).

Revert the world_size_across_dp / world_size_per_dp additions to Mapping:
world_size_across_dp was a pure alias for world_size and both had a single
caller. Inline the math in WeightTransferManager.get_world_size instead, leaving
the shared Mapping primitive untouched.

Signed-off-by: Qingyang Wu <willqywu@gmail.com>
The RL control plane is launched by AsyncLLM._serve_rl_control_plane (which
calls build_weight_transfer_app). The standalone-server helpers
(build_weight_transfer_server / run_weight_transfer_server / attach_to_main_loop)
are leftovers from the pre-consolidation design with no callers; remove them,
their now-unused imports (uvicorn, Awaitable/Callable), and refresh the stale
module docstring.

Also drop the StartWeightUpdate/FinishWeightUpdate Req{Input,Output} dataclasses
from io_struct -- never referenced (the lifecycle uses plain bools).

Signed-off-by: Qingyang Wu <willqywu@gmail.com>
start_update no longer takes or stores is_checkpoint_format -- the worker-side
layerwise reload it would drive is deferred, so the flag had no consumer.
/start_weight_update still accepts the field for API compatibility and ignores
it; it will be threaded back through when the worker finalize lands.

Signed-off-by: Qingyang Wu <willqywu@gmail.com>
…trol_*

Rename engine_http_url -> rl_control_url and _proxy_to_engine -> _proxy_to_rl_control
so the sidecar's wiring matches the rl_control_port / _serve_rl_control_plane
vocabulary used everywhere else, and to disambiguate the RL HTTP proxy from
_grpc_call (which also targets the engine over gRPC).

Signed-off-by: Qingyang Wu <willqywu@gmail.com>
The orchestrator-side sidecar IS the control surface (it fronts the gateway and
engine over HTTP/gRPC), so 'http_server' was too generic. Rename the module to
control_server.py and build_server -> build_control_server, update serve_smg's
import/call, refresh the prose references in protocol.py / async_llm.py, and
rename the test to test_control_server.py.

Signed-off-by: Qingyang Wu <willqywu@gmail.com>
…st loop

start() was a dead blocking wrapper -- the orchestrator launches the sidecar via
build_control_server + a daemon thread, nothing calls start(). Remove it.

Harden TestGrpcDirect: give each test a fresh current event loop (and restore
loop state in tearDown), so grpc.aio.insecure_channel binds to a live loop
regardless of loop state left by other tests in the suite (the module-global
channel/stub made this order-dependent).

Signed-off-by: Qingyang Wu <willqywu@gmail.com>
…v + load)

Implement the receive half of update_weights_from_distributed on the inference
worker so an RL trainer (slime/SGLang sender) can push weights in place:

- ModelRunner.init_weights_update_group: join the trainer's NCCL group via
  StatelessProcessGroup + PyNcclCommunicator at rank rank_offset+global_rank.
- ModelRunner.update_weights_from_distributed: receive each named weight from
  rank 0 (in order, stream-synced), apply via the model's load_weights().
- request_handler dispatches Init/UpdateWeights*ReqInput -> the ModelRunner
  methods -> replies with the *ReqOutput types; event_loop passes model_runner in.

Verified on 2xH100 (real ModelRunner methods receive a broadcast + load_weights,
exact match) + a unit test of the worker dispatch. TP>1 fan-out to all workers
is a follow-up; current routing covers the TP=1 rollout.

Signed-off-by: Qingyang Wu <willqywu@gmail.com>
Validated end-to-end as the rollout engine for slime GRPO (Qwen2.5-0.5B
gsm8k, disaggregated NCCL weight sync): rollout -> reward -> train ->
weight sync loops and the model learns (eval/gsm8k 0.493 -> 0.534 over
100 rollouts).

- serve_smg: bind the smg Prometheus exporter to a free port instead of a
  fixed 8413. Any fixed port collides on TIME_WAIT/restart/multi-engine,
  so the gateway panics AddrInUse, leaves no healthy worker, and the
  first request fails tokenizer_not_found / no worker available.
- control_server /generate: (1) default `model` to the single served
  model when the body omits it (sglang clients send none); (2) unwrap the
  gateway's 1-element list to an object for single prompts (clients index
  output["meta_info"]); (3) expose response token ids as
  meta_info.output_token_logprobs when return_logprob is set -- RL
  trainers (slime/verl) read response tokens from there, and without it
  the trainer sees empty responses and never produces a gradient.
- model_runner: join the trainer's weight-update group via
  torch.distributed (rendezvous + dist.broadcast) instead of
  StatelessProcessGroup/PyNcclCommunicator, which keys its TCP store
  differently and deadlocks against a torch trainer group.

Signed-off-by: Qingyang Wu <willqywu@gmail.com>
Pair the two RL weight-sync HTTP dialects symmetrically: vllm_compat_http.py
(vLLM-style -- /init_weight_transfer_engine, /start_weight_update, ...) alongside
sglang_compat_http.py (SGLang -- /update_weights_from_distributed, ...), both
mounted on one in-engine RL control app over the shared WeightTransferManager /
AsyncLLM backend. Renames build_weight_transfer_app -> build_vllm_compat_app and
updates all references. Also fixes inconsistent 'vLLM-native' comments (one
pointed at sglang_compat_http.py by mistake). Backend (engine/weight_transfer/)
unchanged.

Signed-off-by: Qingyang Wu <willqywu@gmail.com>
dongjiyingdjy added a commit that referenced this pull request Jun 10, 2026
The IMA bug was in deep_gemm's paged MQA logits scheduler (fixed
upstream in PR #342); CUDA_LAUNCH_BLOCKING was added for debugging
and is no longer needed.

Signed-off-by: jiyingd <jiyingd@nvidia.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: jiyingd <87510204+dongjiyingdjy@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant