AsyncPreprocessIterableDataset

概述

AsyncPreprocessIterableDataset 是面向流式数据场景的异步预处理包装器,用于在保持样本逻辑顺序不变的前提下,将单条样本的预处理工作从训练主线程中解耦出来,交由多个后台 worker 并发执行。

当前仓库已经在两条训练链路中提供了该能力:

  • Megatron 路径实现位于 mindspeed_mm/data/datasets/qwen2vl_dataset.py
  • FSDP2 路径实现位于 mindspeed_mm/fsdp/data/datasets/huggingface/qwen2vl_dataset.py

它主要服务于 HuggingFace streaming=TrueIterableDataset 场景,适用于文本、图像、视频、音频等多模态样本在训练前需要进行模板拼接、tokenize、模态输入整理等较重 CPU 预处理的任务。

该特性的核心目标包括:

  1. 降低训练主循环等待数据预处理的时间。
  2. 保持数据并行副本之间的样本顺序一致性。
  3. 在不改写现有 preprocess_fn 签名的前提下复用已有预处理逻辑。

动机与背景

在非流式模式下,训练数据通常可以先通过 map 一次性完成预处理,再交给 DataLoader 消费;但在流式模式下,数据本身是一个持续产出的可迭代对象,训练过程需要边读边处理,此时会遇到几个典型问题:

  1. 单条样本预处理开销大。多模态样本往往包含模板构造、图像/视频信息整理、音频特征准备和 tokenizer 编码等步骤,如果全部放在训练主线程里串行执行,会直接拉低吞吐。
  2. 上游流式数据源不适合被多个线程同时直接消费。若多个 worker 同时遍历同一个 IterableDataset,很容易引入重复读取、顺序错乱或不同 rank 样本偏移不一致的问题。
  3. 依赖 len(dataset) 的 sampler 不适用于这类数据。仓库中的 BaseRandomBatchSampler 在初始化时会直接读取数据集长度,而 AsyncPreprocessIterableDataset 本身属于 IterableDataset,不具备固定长度语义。
  4. 流式训练要求顺序稳定。即使引入并发,也必须保证训练侧看到的样本顺序与上游逻辑顺序一致,否则断点续训、对齐验证和多副本一致性都会受到影响。

基于这些约束,AsyncPreprocessIterableDataset 并不是并发读取上游流,而是采用“单线程顺序读取 + 多 worker 并发预处理 + 按序重排输出”的设计。

设计方案

AsyncPreprocessIterableDataset 的整体处理链路如下:

load_dataset(..., streaming=True)
    -> align_dataset(...)
    -> DistributedIterableDataset
    -> AsyncPreprocessIterableDataset
    -> DataLoader / StatefulDataLoader
    -> DataCollator
    -> 模型前向与训练循环

其中的关键设计点如下:

  1. 先做分布式分片,再做异步预处理。

    • Megatron 路径中,训练集在 get_qwen2vl_dataset() 内先包装为 DistributedIterableDataset,再按需包装为 AsyncPreprocessIterableDataset
    • FSDP2 路径中,处理顺序与 Megatron 保持一致,先保证 DP 分片,再启动异步预处理。
  2. 上游 iterable 始终只由一个 producer 顺序消费。

    • producer 负责给每条样本分配全局递增的 sequence_idx,这是后续重排的唯一依据。
  3. 预处理工作由多个后台 worker 并发执行。

    • worker 不直接访问上游 dataset,只处理已经进入任务队列的样本,从而避免多线程竞争上游迭代器。
  4. 结果输出阶段按 sequence_idx 做重排。

    • 即使不同 worker 的完成顺序不同,主迭代器最终仍按原始顺序向训练侧产出结果。
  5. 配置层通过 async_preprocessasync_preprocess_buffer_size 控制该能力。

    • Megatron 配置定义位于 mindspeed_mm/data/data_utils/func_utils/convert.py
    • FSDP2 配置定义位于 mindspeed_mm/fsdp/data/data_utils/func_utils/convert.py

在当前实现下,如果训练数据开启了 streaming: true 并同时设置 async_preprocess: true,数据集构建流程就会进入这一异步路径。

核心机制详解

1. 配置归一化

类初始化时会先归一化 buffer_sizenum_workers

  • 当二者都未配置时,默认 buffer_size=8num_workersmin(buffer_size, cpu_count)
  • 当只配置了 buffer_size 时,num_workers 自动取不超过 CPU 数量的合理值。
  • 当只配置了 num_workers 时,buffer_size 会回落为同样大小。

这套规则的目的是在默认情况下平衡吞吐与内存占用,避免队列过浅导致 worker 饥饿,或队列过深造成额外缓存压力。

2. 单条样本批量化适配

仓库中的 preprocess_fn 以 batch 字典为输入,因此 AsyncPreprocessIterableDataset 不会直接把单条样本传给它,而是会先执行一次轻量封装:

  1. 将单条样本的每个字段包装成长度为 1 的列表。
  2. 调用已有的 preprocess_fn(batch_dict)
  3. 再把返回的 batched 结果拆回单样本列表。

这种做法保证了流式异步预处理可以复用现有预处理器,而不需要为 streaming 场景额外实现一套单样本版本。

3. producer 线程负责顺序读取

__iter__() 内部会启动一个 producer 线程,专门顺序遍历上游 dataset。producer 线程只负责两件事:

  1. 为每条样本生成递增的 sequence_idx
  2. (sequence_idx, item) 放入 task_queue

这样可以保证上游流式数据源始终只有一个读取入口,从根源上避免并发读取带来的顺序不确定性。

4. worker 线程负责并发预处理

多个 worker 线程从 task_queue 中取出任务并执行 _preprocess_item()。每个 worker 处理完成后,会将结果以 (message_type, payload, extra) 的形式写入 result_queue

当前实现里主要存在三类消息:

  1. result:表示某个 sequence_idx 对应的样本预处理完成。
  2. done:表示某个 worker 已经处理完所有任务并退出。
  3. error:表示 producer 或 worker 内部抛出了异常,需要终止整个迭代链路。

5. 按序重排保证输出稳定

由于不同样本的预处理耗时不同,worker 返回结果的顺序通常是不稳定的。为保证训练侧观察到的样本顺序与上游一致,主迭代器会维护两个状态:

  • pending_results:暂存已经完成但还不能立即输出的结果。
  • next_sequence_idx:下一个允许对外产出的样本编号。

主迭代器只有在 pending_results 中找到 next_sequence_idx 时才会真正 yield,并在输出后将 next_sequence_idx 递增。这样即便结果异步完成,最终对训练主循环暴露的顺序仍然稳定。

6. 分布式一致性

AsyncPreprocessIterableDataset 本身不做 DP 分片,它依赖外层的 DistributedIterableDataset 先完成每个数据并行副本的数据切分:

  • Megatron 侧通过 mpu.get_data_parallel_world_size()mpu.get_data_parallel_rank() 计算本 rank 应消费的子序列。
  • FSDP2 侧通过并行状态和当前 rank 计算数据并行组内的分片归属。

这样做的好处是每个 DP 副本只会对属于自己的样本子流做异步预处理,避免多个副本看到相同样本,也避免跨副本顺序不一致。

7. 异常传播与资源回收

实现中使用 stop_event、任务结束哨兵和统一的错误消息来保证异常时可以快速退出:

  1. 如果 producer 遍历上游数据时出错,会把错误写入 result_queue
  2. 如果 worker 在执行预处理时出错,也会立刻发送错误消息并触发 stop_event
  3. 主线程收到错误消息后会中止迭代,并将异常重新抛回训练侧。
  4. finally 分支会统一回收后台线程,避免出现悬挂线程。

使用示例

1. Megatron 示例:Qwen2.5Omni

Megatron 路径下,Qwen2.5Omni 的训练入口脚本为 examples/qwen2.5omni/finetune_qwen2_5_omni_7b.sh,训练程序入口为 pretrain_vlm.py,默认数据配置文件为 examples/qwen2.5omni/data_7b.json

若要启用 AsyncPreprocessIterableDataset,建议在数据配置中至少增加以下字段:

{
    "dataset_param": {
        "dataset_type": "huggingface",
        "preprocess_parameters": {
            "model_name_or_path": "./ckpt/hf_path/Qwen2.5-Omni-7B",
            "use_fast_tokenizer": true,
            "split_special_tokens": false,
            "image_max_pixels": 262144,
            "image_min_pixels": 0,
            "video_max_pixels": 16384,
            "video_min_pixels": 0,
            "video_fps": 2.0,
            "video_maxlen": 128
        },
        "basic_parameters": {
            "template": "qwen2_omni",
            "dataset_dir": "./data",
            "dataset": "./data/mllm_format_llava_instruct_data.json",
            "cache_dir": "./data/cache_dir",
            "train_on_prompt": false,
            "mask_history": false,
            "preprocessing_batch_size": 1000,
            "preprocessing_num_workers": 8,
            "async_preprocess": true,
            "async_preprocess_buffer_size": 16,
            "streaming": true,
            "max_samples": null,
            "tool_format": null
        },
        "attr": {
            "system": null,
            "images": null,
            "videos": "videos",
            "audios": "audios",
            "messages": "messages",
            "role_tag": "role",
            "content_tag": "content",
            "user_tag": "user",
            "assistant_tag": "assistant",
            "observation_tag": null,
            "function_tag": null,
            "system_tag": null
        }
    },
    "dataloader_param": {
        "dataloader_mode": "base",
        "drop_last": true,
        "collate_param": {
            "model_name": "qwen2vl",
            "ignore_pad_token_for_loss": true
        },
        "pin_memory": true,
        "shuffle": false
    }
}

推荐同时将脚本中的 --num-workers 调整为 0,避免 DataLoader 再启动额外 worker 去复制流式数据迭代器。因为当前异步能力已经由 preprocessing_num_workers 承担,外层 DataLoader 再并发通常不会带来额外收益,反而可能引入重复读取风险。

启动方式示例如下:

bash examples/qwen2.5omni/finetune_qwen2_5_omni_7b.sh

2. FSDP2 示例:Qwen3Omni

FSDP2 路径下,Qwen3Omni 的训练入口脚本为 examples/qwen3omni/finetune_qwen3omni.sh,训练程序入口为 pretrain_transformers.py。该脚本已经开启 --use-torch-fsdp2,默认数据配置文件为 examples/qwen3omni/data.json,FSDP2 配置文件为 examples/qwen3omni/fsdp2_config.yaml

若要启用 AsyncPreprocessIterableDataset,可在 examples/qwen3omni/data.json 中按如下方式修改:

{
    "dataset_param": {
        "dataset_type": "huggingface",
        "preprocess_parameters": {
            "model_name_or_path": "./ckpt/hf_path/Qwen3-Omni-30B-A3B-Instruct",
            "use_fast_tokenizer": true,
            "split_special_tokens": false,
            "use_audio_in_video": false,
            "image_max_pixels": 262144,
            "image_min_pixels": 1024,
            "video_max_pixels": 16384,
            "video_min_pixels": 256,
            "video_fps": 2.0,
            "video_maxlen": 128,
            "audio_sampling_rate": 16000
        },
        "basic_parameters": {
            "template": "qwen3_omni",
            "dataset_dir": "./data",
            "dataset": "./data/mllm_format_llava_instruct_data.json",
            "cache_dir": "./data/cache_dir",
            "overwrite_cache": false,
            "train_on_prompt": false,
            "mask_history": false,
            "preprocessing_batch_size": 1000,
            "preprocessing_num_workers": 8,
            "async_preprocess": true,
            "async_preprocess_buffer_size": 16,
            "streaming": true,
            "max_samples": 10000,
            "tool_format": null
        },
        "attr": {
            "system": null,
            "images": "images",
            "videos": null,
            "audios": null,
            "messages": "messages",
            "role_tag": "role",
            "content_tag": "content",
            "user_tag": "user",
            "assistant_tag": "assistant",
            "observation_tag": null,
            "function_tag": null,
            "system_tag": null
        }
    },
    "dataloader_param": {
        "dataloader_mode": "base",
        "drop_last": true,
        "collate_param": {
            "model_name": "qwen3omni",
            "ignore_pad_token_for_loss": true
        },
        "pin_memory": true,
        "shuffle": false
    }
}

当前官方 Qwen3Omni 脚本默认 MBS=1,因此在该样例中切换为 base 模式后仍可直接配合流式异步预处理使用。同样建议将脚本中的 --num-workers 调整为 0,避免外层 DataLoader 与内部异步 worker 产生重复并发。

启动方式示例如下:

bash examples/qwen3omni/finetune_qwen3omni.sh

3. 使用建议

  1. preprocessing_num_workers 建议从 4 或 8 起步,根据 CPU 资源和样本复杂度逐步调优。
  2. async_preprocess_buffer_size 通常建议不小于 preprocessing_num_workers,以减少队列阻塞。
  3. 若优先关注可复现性,建议在开启该特性时同步固定随机种子,并先关闭其他会改变样本顺序的随机化配置。
  4. 若当前配置仍使用依赖 len(dataset) 的 batch sampler,应先切换到不依赖数据集长度的加载方式,再启用流式异步预处理。