TransferQueue:基于分布式存储的高性能数据流转管理项目

异步高性能流式数据引擎

分支6Tags3

TransferQueue:面向高效训练后处理的异步流式数据管理模块

论文 | 知乎 | 微信

Ask DeepWiki.com GitHub Repo stars GitHub commit activity


🎉 概述

TransferQueue 是一款高性能数据存储与传输模块,具备全景数据可见性和流式调度能力,专为训练后处理工作流中的高效数据流而优化。

TransferQueue 提供细粒度、子样本级的数据管理和负载均衡能力。它作为数据网关,解耦了计算任务间的显式数据依赖,支持分而治之的处理方式,显著简化算法控制器设计。

🔄 更新动态

  • 2026年6月9日:🔥 TransferQueue 已被腾讯混元团队开发的多模态模型统一强化学习框架 UniRL 采用。
  • 2026年4月15日:🔥 TransferQueue 已被 Relax 采用!通过 StreamingDataLoader 抽象,它在集群中以微批次粒度调度训练数据,减少了单控制器设置中的同步壁垒。
  • 2026年4月10日:🔥 TransferQueue 现已正式集成到 verl在 128 × H100 GPU 集群上,我们实现了多模态训练后处理端到端性能提升 49.1%! 更多详情请参考 我们的博客
  • 2026年2月8日:🔥 PR#26PR#28 中的高级 API 极大简化了初始化和使用流程。现在,您可以使用类 Redis 风格的 API 来利用 TransferQueue 提供的大部分高级功能!
  • 2026年1月28日:我们实验性地引入了 StreamingDataLoader 接口,用于构建全流式的生产-消费管道。详情请参考我们的 tutorials/06_streaming_dataloader.py
  • 2025年12月30日TransferQueue 与 verl 的集成已在 DAPO 算法上完成大规模测试 (64 节点,1024 卡)。它显著优化了主机内存利用率并加速了数据传输。更多详情敬请期待!
  • 2025年12月20日:🔥 官方 教程 发布!欢迎查阅。
  • 2025年11月10日:我们将数据检索逻辑从 TransferQueueController 中解耦 PR#101。现在您可以实现自定义的 Sampler 来定制数据消费方式。
  • 2025年11月5日:我们提供了 KVStorageManager,简化了与基于 KV 的存储后端的集成 PR#96。首个可用的基于 KV 的后端是 openYuanrong
  • 2025年11月4日:数据分区功能在 PR#98 中可用。现在您可以定义逻辑数据分区来管理训练/验证/测试数据集。
  • 2025年10月25日:存储后端现已支持插件化 PR#66。您现在可以尝试将自己的存储后端与 TransferQueue 集成!
  • 2025年10月21日:与 verl 的早期集成已准备就绪 verl/pull/3649。后续 PR 将通过完全解耦数据与控制流来优化单控制器架构。
  • 2025年7月22日:我们在知乎上发布了一系列中文博客文章:知乎 12
  • 2025年7月21日:我们在 verl 社区发起了 RFC verl/RFC#2662
  • 2025年7月2日:我们发表了论文 AsyncFlow

🧩 组件

控制平面:全景数据管理

在控制平面中,TransferQueueController 将每个训练样本的生产状态消费状态作为元数据进行跟踪。当所有所需数据字段准备就绪(即已写入 StorageManager)后,该数据样本即可被下游任务消费。

我们还会跟踪每个计算任务(例如 generate_sequencescompute_log_prob 等)的消费历史。因此,即使不同的计算任务需要相同的数据字段,它们也能独立消费数据,互不干扰。

为了使数据检索过程更具可定制性,我们提供了 Sampler 类,允许用户定义自己的数据检索和消费逻辑。详情请参考自定义部分。

控制平面已实验性支持负载均衡能力。此设计使我们能够从单一控制器卸载部分数据管理功能。详情请参见#PR70

数据平面:分布式数据存储

在数据平面中,我们采用可插拔设计,使 TransferQueue 能够根据用户需求集成不同的存储后端。

具体而言,我们提供了 StorageManager 抽象类,其定义的核心 API 如下:

  • async def put_data(self, data: TensorDict, metadata: BatchMeta) -> None
  • async def get_data(self, metadata: BatchMeta) -> TensorDict
  • async def clear_data(self, metadata: BatchMeta) -> None

该类封装了 TransferQueue 系统内的核心交互逻辑。您只需编写一个简单的子类,即可集成自定义的存储后端。详情请参考自定义部分。

目前,我们支持以下存储后端:

  • SimpleStorage:基础的 CPU 内存存储,数据格式约束最小,易于使用。
  • Yuanrong使用指南,测试版,#PR107#PR96):昇腾原生数据系统,提供包括 HBM/DRAM/SSD 在内的分层存储接口。
  • MooncakeStore(测试版,#PR162):高性能、基于 KV 的分层存储,支持 GPU 与 DRAM 之间的 RDMA 传输。
  • RayRDT(alpha 版,#PR167):Ray 的新特性,允许 Ray 在 Ray 角色之间直接存储和传递对象。

其中,SimpleStorageUnit 作为我们的默认存储后端,由 AsyncSimpleStorageManager 类进行协调。每个存储单元可部署在独立节点上,实现分布式数据管理。

SimpleStorageUnit 采用如下二维数据结构:

  • 每一行对应一个训练样本,在相应的全局批次中分配有唯一索引。
  • 每一列代表计算任务的输入/输出数据字段。

这种数据结构设计源于训练后处理过程的计算特性,即每个训练样本在任务流水线中以接力方式生成。它提供了精确的寻址能力,支持以流式方式进行细粒度、并发的数据读写操作。

用户界面:高层级与低层级 API

层级 类型 风格 细粒度访问 流式处理 采样器 多后端支持
高层 KV 接口 (PR#28) Put/Get/List/Clear
高层 StreamingDataLoader (PR#23) PyTorch DataLoader
低层 TransferQueueClient 基于元数据

基于键值(Key-Value)的 API

为简化 TransferQueue 的使用,我们提供了类 Redis 风格的高层级 API,该 API 可暴露其大部分高级功能(PR#28)。

方法

  • (async_)kv_put:通过键插入/更新多列样本,可选择添加元数据标签。
  • (async_)kv_batch_put:高效批量插入多个键值对。
  • (async_)kv_batch_get:(通过键)检索样本,支持(按字段)选择列。
  • (async_)kv_list:列出分区中的键和标签(元数据)。
  • (async_)kv_clear:从存储中移除键值对。

核心特性

  • 类 Redis 语义:熟悉的 KV 接口(Put/Get/List),实现零学习成本。
  • 细粒度访问:可更新或检索键(行)内的特定字段(列),无需执行整行操作。
  • 分区隔离:存储命名空间的逻辑分离。
  • 元数据标签:用于状态跟踪的轻量级元数据。
  • 可插拔后端:支持多种后端。

详细使用示例请参考 tutorials/basic.ipynbtutorials/02_kv_interface.py

StreamingDataLoader API

该 API 旨在作为标准 PyTorch DataLoader 的即插即用替代品,允许每个进程级(rank)自动消费数据,无需单一控制器干预。

在此场景中,TransferQueueController 充当数据分发的辅助控制器,配合用户定义的 Sampler 类来组织数据流。 它封装了各种并行策略所需的复杂调度和数据传输逻辑,将 TransferQueue 无缝集成到现有训练工作流中,并简化了分布式框架的开发。

更多详情请参见 路线图tutorials/06_streaming_dataloader.py

低层级原生 API

TransferQueue 的原生接口在 TransferQueueClient 中实现。它通过原生的原子操作提供最大灵活性。

开发人员可直接利用 TransferQueueClient 实现需要细粒度控制和全流式数据调度的高级功能,相关示例请参见以下教程:

🔥 应用案例

同构部署示例

verl

将 TransferQueue 集成到 verl 中的主要目的是缓解单一控制器 RayPPOTrainer 的数据传输瓶颈。当前,所有 DataProto 对象都必须通过 RayPPOTrainer 进行路由,这导致整个训练后系统存在单点瓶颈。

verl 的官方集成版本可在 verl/pull/5401 获取,设计文档请参见 [RFC] PPOTrainer with TransferQueue Integration。您也可以参考我们的 示例代码,其中我们以高级方式模拟了 verl 的使用场景。

异构部署示例

我们已通过 TransferQueue 实验性地实现了标准化、全流式的分布式工作流。

通过利用 RankAwareSamplerStreamingDataLoader 接口,我们构建了精简的微批处理级生产者-消费者管道。这种设计无需手动确定不同并行策略下的数据分发逻辑(这是单一控制器模式中的典型复杂性),从而极大简化了框架设计。

更多详情请参考我们的 路线图tutorials/05_streaming_dataloader.py

🚀 快速开始

使用 Python 包

pip install TransferQueue

从源代码安装

  1. 从 GitHub 仓库克隆源代码

    git clone https://github.com/Ascend/TransferQueue/
    cd TransferQueue
    
  2. 从源代码安装

    pip install .
    

从源代码构建 wheel 包

  1. 从 GitHub 仓库克隆源代码

    git clone https://github.com/Ascend/TransferQueue/
    cd TransferQueue
    
  2. 安装依赖

    pip install build
    
  3. 构建并安装

    python -m build --wheel
    pip install dist/*.whl
    

📊 性能表现

简单场景:常规 Tensor

复杂场景:常规 Tensor + NestedTensor + NonTensor

注意:openYuanrong 基准测试仅使用单个 NPU,因此无法体现多 NPU 的扩展性。此外,openYuanrong 的测试硬件环境与其他后端不同。

有关详细的性能基准测试,请参考 完整基准测试报告

压力测试

除吞吐量外,我们还验证了高并发下的稳定性。我们提供了一份 压力测试报告,其中展示了在 4 个节点上,8192 个并发客户端向 TransferQueue 写入 2 TB 数据的场景。系统保持稳定,未出现任何崩溃或数据丢失。

🛠️ 自定义 TransferQueue

定义您自己的数据检索逻辑

我们提供了一个 BaseSampler 抽象类,它定义了以下接口:

@abstractmethod
def sample(
    self,
    ready_indexes: list[int],
    batch_size: int,
    *args: Any,
    **kwargs: Any,
) -> tuple[list[int], list[int]]:
    """Sample a batch of indices from the ready indices.

    Args:
        ready_indexes: List of global indices for which all required fields of the
        corresponding samples have been produced, and the samples are not labeled as
        consumed in the corresponding task.
        batch_size: Number of samples to select
        *args: Additional positional arguments for specific sampler implementations
        **kwargs: Additional keyword arguments for specific sampler implementations

    Returns:
        List of sampled global indices of length batch_size
        List of global indices of length batch_size that should be labeled as consumed
        (will never be retrieved in the future)

    Raises:
        ValueError: If batch_size is invalid or ready_indexes is insufficient
    """
    raise NotImplementedError("Subclasses must implement sample")

在本设计中,我们通过两个返回值将数据获取和数据消费分离,这样可以轻松控制样本替换。我们已实现两种参考设计:SequentialSamplerGRPOGroupNSampler

Sampler 类或实例应在初始化期间传递给 TransferQueueController。在每次 get_meta 调用时,您可以向 Sampler 提供动态采样参数。

from transfer_queue import TransferQueueController, TransferQueueClient, GRPOGroupNSampler, process_zmq_server_info

# Option 1: Pass the sampler class to the TransferQueueController
controller = TransferQueueController.remote(GRPOGroupNSampler)

# Option 2: Pass the sampler instance to the TransferQueueController (if you need custom configuration)
your_own_sampler = YourOwnSampler(config)
controller = TransferQueueController.remote(your_own_sampler)

# Use the sampler
batch_meta = client.get_meta(
    data_fields=["input_ids", "attention_mask"],
    batch_size=8,
    partition_id="train_0",
    task_name="generate_sequences",
)

有关更多详细信息,请参阅 tutorial/05_custom_sampler.py

如何集成新的存储后端

数据平面的组织结构如下:

  transfer_queue/
  ├── storage/
  │   ├── __init__.py
  │   │── simple_backend.py             # Default distributed storage backend (SimpleStorageUnit) by TQ 
  │   ├── managers/                     # Managers are upper level interfaces that encapsulate the interaction logic with TQ system.
  │   │   ├── __init__.py
  │   │   ├──base.py                    # StorageManager, KVStorageManager, StorageManagerFactory
  │   │   ├──simple_storage_manager.py  # AsyncSimpleStorageManager
  │   │   ├──yuanrong_manager.py        # YuanrongStorageManager
  │   │   └──mooncake_manager.py        # MooncakeStorageManager
  │   └── clients/                      # Clients are lower level interfaces that directly manipulate the target storage backend.
  │   │   ├── __init__.py
  │   │   ├── base.py                   # StorageKVClient, StorageClientFactory
  │   │   ├── yuanrong_client.py        # YuanrongStorageClient
  │   │   ├── mooncake_client.py        # MooncakeStorageClient
  │   │   └── ray_storage_client.py     # RayStorageClient

要将 TransferQueue 与自定义存储后端集成,首先需实现一个继承自 StorageManager 的子类。该子类充当 TransferQueue 系统与目标存储后端之间的适配器。对于基于 KV 的存储后端,您只需继承 KVStorageManager,它可用作所有基于 KV 的后端的通用管理器。

分布式存储后端通常都有自己的原生客户端作为存储系统的接口。在这种情况下,可以参照 storage/clients 目录中提供的示例,为该客户端编写一个底层适配器。

系统为 StorageManagerStorageClient 均提供了工厂类,以方便集成。在工厂类中添加必要的必填参数说明有助于提升整体用户体验。

✏️ 贡献指南

热烈欢迎贡献!

我们鼓励提出新想法、功能建议和用户体验反馈,您可以随时提交 issue 或 PR。我们会尽快回复。

建议使用 pre-commit 以获得更好的代码格式。

# install pre-commit
pip install pre-commit

# run the following command in your repo folder, then fix the check before committing your code
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always

📑 引用

如果您发现本仓库对您有所帮助,恳请引用我们的论文:
@article{han2025asyncflow,
  title={AsyncFlow: An Asynchronous Streaming RL Framework for Efficient LLM Post-Training},
  author={Han, Zhenyu and You, Ansheng and Wang, Haibo and Luo, Kui and Yang, Guang and Shi, Wenqi and Chen, Menglong and Zhang, Sicheng and Lan, Zeshun and Deng, Chunshi and others},
  journal={arXiv preprint arXiv:2507.01663},
  year={2025}
}

项目介绍

异步高性能流式数据引擎

定制我的领域