AsyncPreprocessIterableDataset
概述
AsyncPreprocessIterableDataset 是面向流式数据场景的异步预处理包装器,用于在保持样本逻辑顺序不变的前提下,将单条样本的预处理工作从训练主线程中解耦出来,交由多个后台 worker 并发执行。
当前仓库已经在两条训练链路中提供了该能力:
- Megatron 路径实现位于
mindspeed_mm/data/datasets/qwen2vl_dataset.py - FSDP2 路径实现位于
mindspeed_mm/fsdp/data/datasets/huggingface/qwen2vl_dataset.py
它主要服务于 HuggingFace streaming=True 的 IterableDataset 场景,适用于文本、图像、视频、音频等多模态样本在训练前需要进行模板拼接、tokenize、模态输入整理等较重 CPU 预处理的任务。
该特性的核心目标包括:
- 降低训练主循环等待数据预处理的时间。
- 保持数据并行副本之间的样本顺序一致性。
- 在不改写现有
preprocess_fn签名的前提下复用已有预处理逻辑。
动机与背景
在非流式模式下,训练数据通常可以先通过 map 一次性完成预处理,再交给 DataLoader 消费;但在流式模式下,数据本身是一个持续产出的可迭代对象,训练过程需要边读边处理,此时会遇到几个典型问题:
- 单条样本预处理开销大。多模态样本往往包含模板构造、图像/视频信息整理、音频特征准备和 tokenizer 编码等步骤,如果全部放在训练主线程里串行执行,会直接拉低吞吐。
- 上游流式数据源不适合被多个线程同时直接消费。若多个 worker 同时遍历同一个
IterableDataset,很容易引入重复读取、顺序错乱或不同 rank 样本偏移不一致的问题。 - 依赖
len(dataset)的 sampler 不适用于这类数据。仓库中的BaseRandomBatchSampler在初始化时会直接读取数据集长度,而AsyncPreprocessIterableDataset本身属于IterableDataset,不具备固定长度语义。 - 流式训练要求顺序稳定。即使引入并发,也必须保证训练侧看到的样本顺序与上游逻辑顺序一致,否则断点续训、对齐验证和多副本一致性都会受到影响。
基于这些约束,AsyncPreprocessIterableDataset 并不是并发读取上游流,而是采用“单线程顺序读取 + 多 worker 并发预处理 + 按序重排输出”的设计。
设计方案
AsyncPreprocessIterableDataset 的整体处理链路如下:
load_dataset(..., streaming=True)
-> align_dataset(...)
-> DistributedIterableDataset
-> AsyncPreprocessIterableDataset
-> DataLoader / StatefulDataLoader
-> DataCollator
-> 模型前向与训练循环
其中的关键设计点如下:
-
先做分布式分片,再做异步预处理。
- Megatron 路径中,训练集在
get_qwen2vl_dataset()内先包装为DistributedIterableDataset,再按需包装为AsyncPreprocessIterableDataset。 - FSDP2 路径中,处理顺序与 Megatron 保持一致,先保证 DP 分片,再启动异步预处理。
- Megatron 路径中,训练集在
-
上游 iterable 始终只由一个 producer 顺序消费。
- producer 负责给每条样本分配全局递增的
sequence_idx,这是后续重排的唯一依据。
- producer 负责给每条样本分配全局递增的
-
预处理工作由多个后台 worker 并发执行。
- worker 不直接访问上游 dataset,只处理已经进入任务队列的样本,从而避免多线程竞争上游迭代器。
-
结果输出阶段按
sequence_idx做重排。- 即使不同 worker 的完成顺序不同,主迭代器最终仍按原始顺序向训练侧产出结果。
-
配置层通过
async_preprocess与async_preprocess_buffer_size控制该能力。- Megatron 配置定义位于
mindspeed_mm/data/data_utils/func_utils/convert.py - FSDP2 配置定义位于
mindspeed_mm/fsdp/data/data_utils/func_utils/convert.py
- Megatron 配置定义位于
在当前实现下,如果训练数据开启了 streaming: true 并同时设置 async_preprocess: true,数据集构建流程就会进入这一异步路径。
核心机制详解
1. 配置归一化
类初始化时会先归一化 buffer_size 与 num_workers:
- 当二者都未配置时,默认
buffer_size=8,num_workers取min(buffer_size, cpu_count)。 - 当只配置了
buffer_size时,num_workers自动取不超过 CPU 数量的合理值。 - 当只配置了
num_workers时,buffer_size会回落为同样大小。
这套规则的目的是在默认情况下平衡吞吐与内存占用,避免队列过浅导致 worker 饥饿,或队列过深造成额外缓存压力。
2. 单条样本批量化适配
仓库中的 preprocess_fn 以 batch 字典为输入,因此 AsyncPreprocessIterableDataset 不会直接把单条样本传给它,而是会先执行一次轻量封装:
- 将单条样本的每个字段包装成长度为 1 的列表。
- 调用已有的
preprocess_fn(batch_dict)。 - 再把返回的 batched 结果拆回单样本列表。
这种做法保证了流式异步预处理可以复用现有预处理器,而不需要为 streaming 场景额外实现一套单样本版本。
3. producer 线程负责顺序读取
__iter__() 内部会启动一个 producer 线程,专门顺序遍历上游 dataset。producer 线程只负责两件事:
- 为每条样本生成递增的
sequence_idx。 - 把
(sequence_idx, item)放入task_queue。
这样可以保证上游流式数据源始终只有一个读取入口,从根源上避免并发读取带来的顺序不确定性。
4. worker 线程负责并发预处理
多个 worker 线程从 task_queue 中取出任务并执行 _preprocess_item()。每个 worker 处理完成后,会将结果以 (message_type, payload, extra) 的形式写入 result_queue。
当前实现里主要存在三类消息:
result:表示某个sequence_idx对应的样本预处理完成。done:表示某个 worker 已经处理完所有任务并退出。error:表示 producer 或 worker 内部抛出了异常,需要终止整个迭代链路。
5. 按序重排保证输出稳定
由于不同样本的预处理耗时不同,worker 返回结果的顺序通常是不稳定的。为保证训练侧观察到的样本顺序与上游一致,主迭代器会维护两个状态:
pending_results:暂存已经完成但还不能立即输出的结果。next_sequence_idx:下一个允许对外产出的样本编号。
主迭代器只有在 pending_results 中找到 next_sequence_idx 时才会真正 yield,并在输出后将 next_sequence_idx 递增。这样即便结果异步完成,最终对训练主循环暴露的顺序仍然稳定。
6. 分布式一致性
AsyncPreprocessIterableDataset 本身不做 DP 分片,它依赖外层的 DistributedIterableDataset 先完成每个数据并行副本的数据切分:
- Megatron 侧通过
mpu.get_data_parallel_world_size()与mpu.get_data_parallel_rank()计算本 rank 应消费的子序列。 - FSDP2 侧通过并行状态和当前 rank 计算数据并行组内的分片归属。
这样做的好处是每个 DP 副本只会对属于自己的样本子流做异步预处理,避免多个副本看到相同样本,也避免跨副本顺序不一致。
7. 异常传播与资源回收
实现中使用 stop_event、任务结束哨兵和统一的错误消息来保证异常时可以快速退出:
- 如果 producer 遍历上游数据时出错,会把错误写入
result_queue。 - 如果 worker 在执行预处理时出错,也会立刻发送错误消息并触发
stop_event。 - 主线程收到错误消息后会中止迭代,并将异常重新抛回训练侧。
finally分支会统一回收后台线程,避免出现悬挂线程。
使用示例
1. Megatron 示例:Qwen2.5Omni
Megatron 路径下,Qwen2.5Omni 的训练入口脚本为 examples/qwen2.5omni/finetune_qwen2_5_omni_7b.sh,训练程序入口为 pretrain_vlm.py,默认数据配置文件为 examples/qwen2.5omni/data_7b.json。
若要启用 AsyncPreprocessIterableDataset,建议在数据配置中至少增加以下字段:
{
"dataset_param": {
"dataset_type": "huggingface",
"preprocess_parameters": {
"model_name_or_path": "./ckpt/hf_path/Qwen2.5-Omni-7B",
"use_fast_tokenizer": true,
"split_special_tokens": false,
"image_max_pixels": 262144,
"image_min_pixels": 0,
"video_max_pixels": 16384,
"video_min_pixels": 0,
"video_fps": 2.0,
"video_maxlen": 128
},
"basic_parameters": {
"template": "qwen2_omni",
"dataset_dir": "./data",
"dataset": "./data/mllm_format_llava_instruct_data.json",
"cache_dir": "./data/cache_dir",
"train_on_prompt": false,
"mask_history": false,
"preprocessing_batch_size": 1000,
"preprocessing_num_workers": 8,
"async_preprocess": true,
"async_preprocess_buffer_size": 16,
"streaming": true,
"max_samples": null,
"tool_format": null
},
"attr": {
"system": null,
"images": null,
"videos": "videos",
"audios": "audios",
"messages": "messages",
"role_tag": "role",
"content_tag": "content",
"user_tag": "user",
"assistant_tag": "assistant",
"observation_tag": null,
"function_tag": null,
"system_tag": null
}
},
"dataloader_param": {
"dataloader_mode": "base",
"drop_last": true,
"collate_param": {
"model_name": "qwen2vl",
"ignore_pad_token_for_loss": true
},
"pin_memory": true,
"shuffle": false
}
}
推荐同时将脚本中的 --num-workers 调整为 0,避免 DataLoader 再启动额外 worker 去复制流式数据迭代器。因为当前异步能力已经由 preprocessing_num_workers 承担,外层 DataLoader 再并发通常不会带来额外收益,反而可能引入重复读取风险。
启动方式示例如下:
bash examples/qwen2.5omni/finetune_qwen2_5_omni_7b.sh
2. FSDP2 示例:Qwen3Omni
FSDP2 路径下,Qwen3Omni 的训练入口脚本为 examples/qwen3omni/finetune_qwen3omni.sh,训练程序入口为 pretrain_transformers.py。该脚本已经开启 --use-torch-fsdp2,默认数据配置文件为 examples/qwen3omni/data.json,FSDP2 配置文件为 examples/qwen3omni/fsdp2_config.yaml。
若要启用 AsyncPreprocessIterableDataset,可在 examples/qwen3omni/data.json 中按如下方式修改:
{
"dataset_param": {
"dataset_type": "huggingface",
"preprocess_parameters": {
"model_name_or_path": "./ckpt/hf_path/Qwen3-Omni-30B-A3B-Instruct",
"use_fast_tokenizer": true,
"split_special_tokens": false,
"use_audio_in_video": false,
"image_max_pixels": 262144,
"image_min_pixels": 1024,
"video_max_pixels": 16384,
"video_min_pixels": 256,
"video_fps": 2.0,
"video_maxlen": 128,
"audio_sampling_rate": 16000
},
"basic_parameters": {
"template": "qwen3_omni",
"dataset_dir": "./data",
"dataset": "./data/mllm_format_llava_instruct_data.json",
"cache_dir": "./data/cache_dir",
"overwrite_cache": false,
"train_on_prompt": false,
"mask_history": false,
"preprocessing_batch_size": 1000,
"preprocessing_num_workers": 8,
"async_preprocess": true,
"async_preprocess_buffer_size": 16,
"streaming": true,
"max_samples": 10000,
"tool_format": null
},
"attr": {
"system": null,
"images": "images",
"videos": null,
"audios": null,
"messages": "messages",
"role_tag": "role",
"content_tag": "content",
"user_tag": "user",
"assistant_tag": "assistant",
"observation_tag": null,
"function_tag": null,
"system_tag": null
}
},
"dataloader_param": {
"dataloader_mode": "base",
"drop_last": true,
"collate_param": {
"model_name": "qwen3omni",
"ignore_pad_token_for_loss": true
},
"pin_memory": true,
"shuffle": false
}
}
当前官方 Qwen3Omni 脚本默认 MBS=1,因此在该样例中切换为 base 模式后仍可直接配合流式异步预处理使用。同样建议将脚本中的 --num-workers 调整为 0,避免外层 DataLoader 与内部异步 worker 产生重复并发。
启动方式示例如下:
bash examples/qwen3omni/finetune_qwen3omni.sh
3. 使用建议
preprocessing_num_workers建议从 4 或 8 起步,根据 CPU 资源和样本复杂度逐步调优。async_preprocess_buffer_size通常建议不小于preprocessing_num_workers,以减少队列阻塞。- 若优先关注可复现性,建议在开启该特性时同步固定随机种子,并先关闭其他会改变样本顺序的随机化配置。
- 若当前配置仍使用依赖
len(dataset)的 batch sampler,应先切换到不依赖数据集长度的加载方式,再启用流式异步预处理。