Skip to content

feat: data plane transfer queue integration#2439

Merged
terrykong merged 160 commits into
mainfrom
zhiyul/data_plane_plan
May 24, 2026
Merged

feat: data plane transfer queue integration#2439
terrykong merged 160 commits into
mainfrom
zhiyul/data_plane_plan

Conversation

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor

@ZhiyuLi-Nvidia ZhiyuLi-Nvidia commented May 7, 2026

What does this PR do ?

Summary

  • Introduces a TransferQueue-mediated data-plane for sync GRPO. Bulk per-token tensors live in TQ between rollout and train; the
    driver only handles per-sample slices and KVBatchMeta.
  • Backends supported: simple and mooncake_cpu
  • Wire format:
    • type-driven dispatch — tensors → maybe_pack_jagged (uint8 jagged for variable-length fields)
    • non-tensors →ack_object_array (verl-style np.ndarray(dtype=object) pickled into uint8 jagged). Object fields recorded in meta.extra_info[META_OBJECT_FIELDS] for read-side decode. Backends only ever see tensors.
  • Core principle: 1-hop offload. Data movement stays local to the producer (rollout actor → TQ) and the consumer (worker fetch → train). The driver never holds bulk between rollout finish and train fan-out — only metadata and small per-sample fields cross Ray.

Details in https://github.com/NVIDIA-NeMo/RL/blob/zhiyul/data_plane_plan/nemo_rl/data_plane/README.md

Scope

  • nemo_rl/data_plane/: codec, adapters (NoOp / TQ over simple + mooncake_cpu), preshard helpers, observability hooks, driver I/O
    (read_columns / write_columns).
  • nemo_rl/algorithms/grpo_sync.py: TQ-only sibling of grpo.grpo_train. Same algorithm/metrics; per-step lifecycle is prepare_step →
    rollout 1-hop put → meta-driven logprob/train → kv_clear.
  • nemo_rl/experience/sync_rollout_actor.py: rollout + flatten + first put encapsulated in one Ray actor. Bulk never visits driver.
  • nemo_rl/models/policy/tq_policy.py: TQ-mediated Policy subclass; meta-driven workers via worker_mixin._fetch.

Test

https://wandb.ai/nvidia/nemorl-dataplane-zhiyul?nw=nwuserzhiyul

Usage

  • You can potentially add a usage example below
# Add a code snippet demonstrating how to use this

Before your PR is "Ready for review"

Pre checks:

  • Make sure you read and followed Contributor guidelines
  • Did you write any new necessary tests?
  • Did you run the unit tests and functional tests locally? Visit our Testing Guide for how to run tests
  • Did you add or update any necessary documentation? Visit our Document Development Guide for how to write, build and test the docs.

Additional Information

  • ...

@ZhiyuLi-Nvidia ZhiyuLi-Nvidia requested review from a team as code owners May 7, 2026 17:22
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented May 7, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@ZhiyuLi-Nvidia ZhiyuLi-Nvidia marked this pull request as draft May 7, 2026 17:22
@ZhiyuLi-Nvidia ZhiyuLi-Nvidia force-pushed the zhiyul/data_plane_plan branch from bff0471 to d20a6ed Compare May 9, 2026 01:15
@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test d20a6ed

@ZhiyuLi-Nvidia ZhiyuLi-Nvidia force-pushed the zhiyul/data_plane_plan branch from d20a6ed to e7f6a91 Compare May 9, 2026 02:02
@ZhiyuLi-Nvidia ZhiyuLi-Nvidia marked this pull request as ready for review May 9, 2026 02:02
@ZhiyuLi-Nvidia ZhiyuLi-Nvidia requested a review from a team as a code owner May 9, 2026 02:02
@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test e7f6a91

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test f8add06

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test c7cb642

@ZhiyuLi-Nvidia ZhiyuLi-Nvidia force-pushed the zhiyul/data_plane_plan branch from c7cb642 to fa121a5 Compare May 9, 2026 02:27
@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test fa121a5

@ZhiyuLi-Nvidia ZhiyuLi-Nvidia added the CI:L1 Run doctests, unit tests, and functional tests label May 9, 2026
@ZhiyuLi-Nvidia ZhiyuLi-Nvidia force-pushed the zhiyul/data_plane_plan branch from fa121a5 to 8de60a8 Compare May 9, 2026 02:41
@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 8de60a8

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test aeb273c

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 1596562

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test abada7e

ZhiyuLi-Nvidia and others added 14 commits May 22, 2026 19:59
…ests

Drop 9 source-grep tests, 1 duplicate import-smoke, 1 xfail-strict
TODO, and the FP8-calib regression test (tautological under the
positive-list calib filter — ``DP_CALIB_INPUT_FIELDS ∩
MESSAGE_LOG_BULK_FIELDS = ∅`` by definition, so the leak the test
guarded against is impossible by construction). Keep only:

* ``test_run_grpo_dispatches_both_trainers`` — behavioral: imports and
  calls ``_select_trainer`` directly; verifies dispatch to grpo_train
  (data_plane absent) and grpo_train_sync (data_plane.enabled=True).

* ``test_data_plane_client_abc_method_present`` — hasattr on the live
  class (not a source-grep); parametrized over the 8 DataPlaneClient
  ABC methods that every adapter must implement.

376 → 73 lines. 9 collected (was 18).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…itHub

The cu13 variant is now published on PyPI as a separate distribution
name (mooncake-transfer-engine-cuda13). Switch from the direct GitHub
release URL to a plain PyPI version pin. The wheel is byte-identical
(verified sha256: a96794f4d3c693e6e71ad85ef578a429ec69ab36e0c2f9b45b200d37e45d3cc0,
44,756,026 bytes), so this is a pure CDN switch — no behavioral change.

Eliminates a recurring github.com fetch-timeout failure mode on compute
nodes during NRL_FORCE_REBUILD_VENVS=true. PyPI (Fastly) is far more
reliable than github releases under concurrent fetches from a Slurm
batch.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
In grpo_train_sync, the driver previously consumed
get_logprobs_from_meta / get_reference_policy_logprobs_from_meta via
their Ray-returned BatchedDataDict — getting the full (B, S) per-token
tensor through Ray's plasma store. That same tensor was also written
back to TQ by the worker leader (for train_from_meta to fetch later),
so every step paid two transfers for the same (B, S) per-token data.

Drop the Ray-side consumption: workers still write to TQ via
_write_back_result_field, and the driver now reads
prev_logprobs / reference_policy_logprobs from TQ alongside the existing
batched read for generation_logprobs / token_mask. One round-trip,
one materialization point.

Expected effect: shorter Ray scheduler queue + earlier plasma cleanup
right before training_prep, which previously inherited the back-pressure
of large outstanding plasma references. Targets the +13.5% on
policy_and_reference_logprobs and the +67% on training_prep observed in
the 32n8g DSV3 perf comparison.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Following d1bfe86c3, the driver-side fix alone was insufficient: even
though grpo_train_sync ignored the BatchedDataDict returned from
get_logprobs_from_meta / get_reference_policy_logprobs_from_meta, the
underlying _logprob_dispatch still ran ray.get() on the worker futures
which materialized the full (B, S) per-token tensor through Ray's
plasma store before the aggregate_fn ran. Empirically the per-step
regression in 11973965 stayed at ~125-128 s, identical to the unpatched
DP-warm baseline.

This patch eliminates the Ray transfer at the source: workers return
None from get_logprobs_presharded / get_reference_policy_logprobs_presharded
once the per-token tensor has been committed to TQ via the existing
_write_back_result_field leader path. Aggregators handle all-None
results by returning None; _logprob_dispatch propagates None up to the
caller. grpo_train_sync (already patched in d1bfe86c3) reads the
tensor from TQ instead.

Wire cost: ~6 MB per step (B=512 × S~1536 × fp32 × 2 fields) and
matching plasma references freed sooner — targets the +13.5 % regression
on policy_and_reference_logprobs and the +67 % on training_prep.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…eturn None

Following 43f55293f the worker entry points
get_logprobs_presharded / get_reference_policy_logprobs_presharded
always return None — the per-token tensor is committed to TQ via
_write_back_result_field. The accompanying _aggregate_logprob_results /
_aggregate_reference_logprob_results helpers always saw an
all-None list and returned None, so the aggregate_fn dispatch was dead
code paying a parameter-and-callback cost.

Drop both helpers. Simplify _logprob_dispatch:
  * remove aggregate_fn parameter
  * drop the unused unsorted_indices result (there is no result to reorder)
  * call get_all_worker_results purely for synchronisation

get_logprobs_from_meta / get_reference_policy_logprobs_from_meta now
return None explicitly; their return type is honest at the type-checker
level.

Also worker_mixin: drop the explicit ``return None`` (implicit), add
``del result`` after _write_back_result_field so the BatchedDataDict
holding the per-token tensor is released before the worker idles
waiting for the next dispatch.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…th for writeback-leader

Background — previously every TQ-aware policy worker class had to
override _is_writeback_leader() and re-derive its (tp, cp, pp) coords
from torch.distributed / device_mesh / parallel_state. The default in
TQWorkerMixin was deliberately wrong (always-True at CP=1) so subclasses
were forced to override; missing or mismatched overrides produced silent
duplicate writes to Mooncake (the -601 ILLEGAL_CLIENT bug). That's
distributed state duplicated across 4 implementations to mirror
information Ray's dispatcher already has via
``sharding_annotations.get_worker_coords(worker_idx)``.

This patch makes Ray's worker_coords the single source of truth:

1. TQWorkerMixin grows ``set_sharding_coords(coords: dict)`` — a setter
   the worker-group calls once per actor right after construction.
   Stored on ``self._sharding_coords``.
2. ``RayWorkerGroup._create_workers_from_bundle_indices`` pushes coords
   into every worker that exposes the setter, immediately after the
   workers list is populated. Workers without the method are skipped.
3. ``TQWorkerMixin._is_writeback_leader()`` is now a 5-line reader of
   ``self._sharding_coords`` and matches Ray's own
   ``output_is_replicated`` semantics: (tp, cp, pp) all coord-0.
4. Subclass overrides in DTensor V1 / DTensor V2 / Megatron are deleted —
   no more "subclass must override" footgun. The V1-only override that
   patched the -601 bug (commit ecd8492) is also gone; the base now
   handles every worker class correctly.

Bug class extinct: it is structurally impossible for a new TQ-aware
worker class to forget the leader-rank logic. If sharding_coords are
present, the gating works; if not, the default (single-worker, all
True) is safe.

Net diff: +43 / -51. No public API change. No per-call kwarg injection.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…e of truth for writeback-leader"

This reverts commit d7cde02e7fd222d1cb8ba9df035c9f1ba7a54704.

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
… -601 duplicate-write

Root cause of the -601 ILLEGAL_CLIENT crash: _get_replica_group() returned None
for CP=1, so _is_replica_leader() was always True for every TP sibling, causing
all siblings to write to Mooncake concurrently on the same key.

Changes:
- Add REPLICATED_AXES constant and NamedSharding.is_axis_zero(coords, axes) as
  the single shared predicate for leader-rank gating (driver-side and worker-side).
- Replace _is_writeback_leader() with _local_coords() abstract method; workers
  feed their TP/CP/PP local ranks and _is_replica_leader() calls is_axis_zero.
- Drop the CP=1 early-return-None guard in _get_replica_group() on all workers;
  replica_group.size() > 1 in _fetch() controls the broadcast-vs-independent path.
- Thread is_leader through _broadcast_batched_data_dict() instead of re-deriving
  it from get_rank() == src inside the helper.
- Add grpo_dp_simple.sh and grpo_dp_mooncake.sh functional tests; wire into L1.
- Add test_writeback_pipeline_e2e.py unit test pinning the non-leader no-write contract.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Drop unused LogprobOutputSpec / ReferenceLogprobOutputSpec imports in
tq_policy.py (F401) and collapse a ternary in grpo_sync.py to satisfy
ruff format.

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…romote

`_promote_1d_leaves` and `_from_wire` iterated `td.keys(include_nested=True,
leaves_only=True)`, which silently excludes non-tensor leaves
(NonTensorData / NonTensorStack). Object fields like `content` and
`MESSAGE_LOG_BULK_FIELDS` were dropped from the rebuilt TensorDict on
the mooncake_cpu put / get path, surfacing later as a `KeyError`.

Switch to top-level `td.keys()` so non-tensor leaves are preserved.
Tighten the post-rebuild assertion to use the same enumeration so it
actually detects the silent drop class it was meant to guard against.

Update `test_object_and_tensor_mixed_round_trip_backends` to mirror the
e2e GRPO `kv_first_write` flow (tensor-only `DP_TRAIN_FIELDS` registration,
production-shape `bulk_batch` with `np.ndarray(dtype=object)` content,
mixed read via `read_columns`).

Add `test_promote_1d_leaves_object_array_roundtrip` to pin the helper
invariant with the production TD shape (1D + 2D tensor + object array).

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 5d7288f

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test f2ff882

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test e1a9cb3

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 298b969

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 7cead7c

2 similar comments
@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 7cead7c

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 7cead7c

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 25249c2

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 25249c2

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 7ae96a2

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test da72c32

@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 14cd92d

ZhiyuLi-Nvidia and others added 2 commits May 24, 2026 00:34
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
CI's build-container runner is aarch64 (GCP GPU runner; uv reports
`aarch64-unknown-linux-gnu`).  The previous marker
`platform_machine == 'x86_64'` made uv silently exclude
mooncake-transfer-engine-cuda13 from the resolution during the Docker
build, even though upstream publishes both x86_64 and aarch64 wheels.
That left the container without mooncake and broke every
`mooncake_cpu` backend test.

Fix:
- pyproject.toml: extend the marker to also accept aarch64 so mooncake
  is installed on both architectures supported by the upstream wheel.
- uv.lock: regenerate.  New marker is
    (platform_machine == 'aarch64' and sys_platform == 'linux')
    or (platform_machine == 'x86_64' and sys_platform == 'linux')
- tests/functional/grpo_dp_mooncake.sh: drop the runtime skip-if-no-mooncake
  guard.  With mooncake now installed in the container, the guard is
  unnecessary and was masking real test failures.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
@ZhiyuLi-Nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test b63c18f

Copy link
Copy Markdown
Collaborator

@terrykong terrykong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CI:L1 Run doctests, unit tests, and functional tests Documentation Improvements or additions to documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants