异步高性能流式数据引擎
🎉 概述
TransferQueue 是一款高性能数据存储与传输模块,具备全景数据可见性和流式调度能力,专为训练后处理工作流中的高效数据流而优化。
TransferQueue 提供细粒度、子样本级的数据管理和负载均衡能力。它作为数据网关,解耦了计算任务间的显式数据依赖,支持分而治之的处理方式,显著简化算法控制器设计。
🔄 更新动态
- 2026年6月9日:🔥 TransferQueue 已被腾讯混元团队开发的多模态模型统一强化学习框架 UniRL 采用。
- 2026年4月15日:🔥 TransferQueue 已被 Relax 采用!通过
StreamingDataLoader抽象,它在集群中以微批次粒度调度训练数据,减少了单控制器设置中的同步壁垒。 - 2026年4月10日:🔥 TransferQueue 现已正式集成到 verl!在 128 × H100 GPU 集群上,我们实现了多模态训练后处理端到端性能提升 49.1%! 更多详情请参考 我们的博客。
- 2026年2月8日:🔥 PR#26、PR#28 中的高级 API 极大简化了初始化和使用流程。现在,您可以使用类 Redis 风格的 API 来利用 TransferQueue 提供的大部分高级功能!
- 2026年1月28日:我们实验性地引入了
StreamingDataLoader接口,用于构建全流式的生产-消费管道。详情请参考我们的 tutorials/06_streaming_dataloader.py。 - 2025年12月30日:TransferQueue 与 verl 的集成已在 DAPO 算法上完成大规模测试 (64 节点,1024 卡)。它显著优化了主机内存利用率并加速了数据传输。更多详情敬请期待!
- 2025年12月20日:🔥 官方 教程 发布!欢迎查阅。
- 2025年11月10日:我们将数据检索逻辑从 TransferQueueController 中解耦 PR#101。现在您可以实现自定义的
Sampler来定制数据消费方式。 - 2025年11月5日:我们提供了
KVStorageManager,简化了与基于 KV 的存储后端的集成 PR#96。首个可用的基于 KV 的后端是 openYuanrong。 - 2025年11月4日:数据分区功能在 PR#98 中可用。现在您可以定义逻辑数据分区来管理训练/验证/测试数据集。
- 2025年10月25日:存储后端现已支持插件化 PR#66。您现在可以尝试将自己的存储后端与 TransferQueue 集成!
- 2025年10月21日:与 verl 的早期集成已准备就绪 verl/pull/3649。后续 PR 将通过完全解耦数据与控制流来优化单控制器架构。
- 2025年7月22日:我们在知乎上发布了一系列中文博客文章:知乎 1、2。
- 2025年7月21日:我们在 verl 社区发起了 RFC verl/RFC#2662。
- 2025年7月2日:我们发表了论文 AsyncFlow。
🧩 组件
控制平面:全景数据管理
在控制平面中,TransferQueueController 将每个训练样本的生产状态和消费状态作为元数据进行跟踪。当所有所需数据字段准备就绪(即已写入 StorageManager)后,该数据样本即可被下游任务消费。
我们还会跟踪每个计算任务(例如 generate_sequences、compute_log_prob 等)的消费历史。因此,即使不同的计算任务需要相同的数据字段,它们也能独立消费数据,互不干扰。
为了使数据检索过程更具可定制性,我们提供了 Sampler 类,允许用户定义自己的数据检索和消费逻辑。详情请参考自定义部分。
控制平面已实验性支持负载均衡能力。此设计使我们能够从单一控制器卸载部分数据管理功能。详情请参见#PR70。
数据平面:分布式数据存储
在数据平面中,我们采用可插拔设计,使 TransferQueue 能够根据用户需求集成不同的存储后端。
具体而言,我们提供了 StorageManager 抽象类,其定义的核心 API 如下:
async def put_data(self, data: TensorDict, metadata: BatchMeta) -> Noneasync def get_data(self, metadata: BatchMeta) -> TensorDictasync def clear_data(self, metadata: BatchMeta) -> None
该类封装了 TransferQueue 系统内的核心交互逻辑。您只需编写一个简单的子类,即可集成自定义的存储后端。详情请参考自定义部分。
目前,我们支持以下存储后端:
- SimpleStorage:基础的 CPU 内存存储,数据格式约束最小,易于使用。
- Yuanrong(使用指南,测试版,#PR107,#PR96):昇腾原生数据系统,提供包括 HBM/DRAM/SSD 在内的分层存储接口。
- MooncakeStore(测试版,#PR162):高性能、基于 KV 的分层存储,支持 GPU 与 DRAM 之间的 RDMA 传输。
- RayRDT(alpha 版,#PR167):Ray 的新特性,允许 Ray 在 Ray 角色之间直接存储和传递对象。
其中,SimpleStorageUnit 作为我们的默认存储后端,由 AsyncSimpleStorageManager 类进行协调。每个存储单元可部署在独立节点上,实现分布式数据管理。
SimpleStorageUnit 采用如下二维数据结构:
- 每一行对应一个训练样本,在相应的全局批次中分配有唯一索引。
- 每一列代表计算任务的输入/输出数据字段。
这种数据结构设计源于训练后处理过程的计算特性,即每个训练样本在任务流水线中以接力方式生成。它提供了精确的寻址能力,支持以流式方式进行细粒度、并发的数据读写操作。
用户界面:高层级与低层级 API
| 层级 | 类型 | 风格 | 细粒度访问 | 流式处理 | 采样器 | 多后端支持 |
|---|---|---|---|---|---|---|
| 高层 | KV 接口 (PR#28) | Put/Get/List/Clear | ✓ | ○ | ✗ | ✓ |
| 高层 | StreamingDataLoader (PR#23) | PyTorch DataLoader | ✓ | ✓ | ✓ | ✓ |
| 低层 | TransferQueueClient | 基于元数据 | ✓ | ✓ | ✓ | ✓ |
基于键值(Key-Value)的 API
为简化 TransferQueue 的使用,我们提供了类 Redis 风格的高层级 API,该 API 可暴露其大部分高级功能(PR#28)。
方法
- (async_)kv_put:通过键插入/更新多列样本,可选择添加元数据标签。
- (async_)kv_batch_put:高效批量插入多个键值对。
- (async_)kv_batch_get:(通过键)检索样本,支持(按字段)选择列。
- (async_)kv_list:列出分区中的键和标签(元数据)。
- (async_)kv_clear:从存储中移除键值对。
核心特性
- 类 Redis 语义:熟悉的 KV 接口(Put/Get/List),实现零学习成本。
- 细粒度访问:可更新或检索键(行)内的特定字段(列),无需执行整行操作。
- 分区隔离:存储命名空间的逻辑分离。
- 元数据标签:用于状态跟踪的轻量级元数据。
- 可插拔后端:支持多种后端。
详细使用示例请参考 tutorials/basic.ipynb 和 tutorials/02_kv_interface.py。
StreamingDataLoader API
该 API 旨在作为标准 PyTorch DataLoader 的即插即用替代品,允许每个进程级(rank)自动消费数据,无需单一控制器干预。
在此场景中,TransferQueueController 充当数据分发的辅助控制器,配合用户定义的 Sampler 类来组织数据流。
它封装了各种并行策略所需的复杂调度和数据传输逻辑,将 TransferQueue 无缝集成到现有训练工作流中,并简化了分布式框架的开发。
更多详情请参见 路线图 和 tutorials/06_streaming_dataloader.py。
低层级原生 API
TransferQueue 的原生接口在 TransferQueueClient 中实现。它通过原生的原子操作提供最大灵活性。
开发人员可直接利用 TransferQueueClient 实现需要细粒度控制和全流式数据调度的高级功能,相关示例请参见以下教程:
- tutorial/03_metadata_concepts.py
- tutorial/04_understanding_controller.py
- tutorial/05_custom_sampler.py
🔥 应用案例
同构部署示例
verl
将 TransferQueue 集成到 verl 中的主要目的是缓解单一控制器 RayPPOTrainer 的数据传输瓶颈。当前,所有 DataProto 对象都必须通过 RayPPOTrainer 进行路由,这导致整个训练后系统存在单点瓶颈。
verl 的官方集成版本可在 verl/pull/5401 获取,设计文档请参见 [RFC] PPOTrainer with TransferQueue Integration。您也可以参考我们的 示例代码,其中我们以高级方式模拟了 verl 的使用场景。
异构部署示例
我们已通过 TransferQueue 实验性地实现了标准化、全流式的分布式工作流。
通过利用 RankAwareSampler 和 StreamingDataLoader 接口,我们构建了精简的微批处理级生产者-消费者管道。这种设计无需手动确定不同并行策略下的数据分发逻辑(这是单一控制器模式中的典型复杂性),从而极大简化了框架设计。
更多详情请参考我们的 路线图 和 tutorials/05_streaming_dataloader.py。
🚀 快速开始
使用 Python 包
pip install TransferQueue
从源代码安装
-
从 GitHub 仓库克隆源代码
git clone https://github.com/Ascend/TransferQueue/ cd TransferQueue -
从源代码安装
pip install .
从源代码构建 wheel 包
-
从 GitHub 仓库克隆源代码
git clone https://github.com/Ascend/TransferQueue/ cd TransferQueue -
安装依赖
pip install build -
构建并安装
python -m build --wheel pip install dist/*.whl
📊 性能表现
简单场景:常规 Tensor
复杂场景:常规 Tensor + NestedTensor + NonTensor
注意:openYuanrong 基准测试仅使用单个 NPU,因此无法体现多 NPU 的扩展性。此外,openYuanrong 的测试硬件环境与其他后端不同。
有关详细的性能基准测试,请参考 完整基准测试报告。
压力测试
除吞吐量外,我们还验证了高并发下的稳定性。我们提供了一份 压力测试报告,其中展示了在 4 个节点上,8192 个并发客户端向 TransferQueue 写入 2 TB 数据的场景。系统保持稳定,未出现任何崩溃或数据丢失。
🛠️ 自定义 TransferQueue
定义您自己的数据检索逻辑
我们提供了一个 BaseSampler 抽象类,它定义了以下接口:
@abstractmethod
def sample(
self,
ready_indexes: list[int],
batch_size: int,
*args: Any,
**kwargs: Any,
) -> tuple[list[int], list[int]]:
"""Sample a batch of indices from the ready indices.
Args:
ready_indexes: List of global indices for which all required fields of the
corresponding samples have been produced, and the samples are not labeled as
consumed in the corresponding task.
batch_size: Number of samples to select
*args: Additional positional arguments for specific sampler implementations
**kwargs: Additional keyword arguments for specific sampler implementations
Returns:
List of sampled global indices of length batch_size
List of global indices of length batch_size that should be labeled as consumed
(will never be retrieved in the future)
Raises:
ValueError: If batch_size is invalid or ready_indexes is insufficient
"""
raise NotImplementedError("Subclasses must implement sample")
在本设计中,我们通过两个返回值将数据获取和数据消费分离,这样可以轻松控制样本替换。我们已实现两种参考设计:SequentialSampler 和 GRPOGroupNSampler。
Sampler 类或实例应在初始化期间传递给 TransferQueueController。在每次 get_meta 调用时,您可以向 Sampler 提供动态采样参数。
from transfer_queue import TransferQueueController, TransferQueueClient, GRPOGroupNSampler, process_zmq_server_info
# Option 1: Pass the sampler class to the TransferQueueController
controller = TransferQueueController.remote(GRPOGroupNSampler)
# Option 2: Pass the sampler instance to the TransferQueueController (if you need custom configuration)
your_own_sampler = YourOwnSampler(config)
controller = TransferQueueController.remote(your_own_sampler)
# Use the sampler
batch_meta = client.get_meta(
data_fields=["input_ids", "attention_mask"],
batch_size=8,
partition_id="train_0",
task_name="generate_sequences",
)
有关更多详细信息,请参阅 tutorial/05_custom_sampler.py。
如何集成新的存储后端
数据平面的组织结构如下:
transfer_queue/
├── storage/
│ ├── __init__.py
│ │── simple_backend.py # Default distributed storage backend (SimpleStorageUnit) by TQ
│ ├── managers/ # Managers are upper level interfaces that encapsulate the interaction logic with TQ system.
│ │ ├── __init__.py
│ │ ├──base.py # StorageManager, KVStorageManager, StorageManagerFactory
│ │ ├──simple_storage_manager.py # AsyncSimpleStorageManager
│ │ ├──yuanrong_manager.py # YuanrongStorageManager
│ │ └──mooncake_manager.py # MooncakeStorageManager
│ └── clients/ # Clients are lower level interfaces that directly manipulate the target storage backend.
│ │ ├── __init__.py
│ │ ├── base.py # StorageKVClient, StorageClientFactory
│ │ ├── yuanrong_client.py # YuanrongStorageClient
│ │ ├── mooncake_client.py # MooncakeStorageClient
│ │ └── ray_storage_client.py # RayStorageClient
要将 TransferQueue 与自定义存储后端集成,首先需实现一个继承自 StorageManager 的子类。该子类充当 TransferQueue 系统与目标存储后端之间的适配器。对于基于 KV 的存储后端,您只需继承 KVStorageManager,它可用作所有基于 KV 的后端的通用管理器。
分布式存储后端通常都有自己的原生客户端作为存储系统的接口。在这种情况下,可以参照 storage/clients 目录中提供的示例,为该客户端编写一个底层适配器。
系统为 StorageManager 和 StorageClient 均提供了工厂类,以方便集成。在工厂类中添加必要的必填参数说明有助于提升整体用户体验。
✏️ 贡献指南
热烈欢迎贡献!
我们鼓励提出新想法、功能建议和用户体验反馈,您可以随时提交 issue 或 PR。我们会尽快回复。
建议使用 pre-commit 以获得更好的代码格式。
# install pre-commit
pip install pre-commit
# run the following command in your repo folder, then fix the check before committing your code
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always
📑 引用
如果您发现本仓库对您有所帮助,恳请引用我们的论文:@article{han2025asyncflow,
title={AsyncFlow: An Asynchronous Streaming RL Framework for Efficient LLM Post-Training},
author={Han, Zhenyu and You, Ansheng and Wang, Haibo and Luo, Kui and Yang, Guang and Shi, Wenqi and Chen, Menglong and Zhang, Sicheng and Lan, Zeshun and Deng, Chunshi and others},
journal={arXiv preprint arXiv:2507.01663},
year={2025}
}
