little demo模型迁移样例

前提条件

需要用户构建容器基础环境,请参考:torch_examples README

表 1 little-demo文件说明

文件名 说明
main.py 模型训练入口
dataset.py 数据集生成
model.py 模型文件
bash.sh 启动脚本
README.md demo模型运行说明

模型介绍

本样例以一个最基础的模型来介绍RecSDK-Torch的使用样例。搭建一个基于RecSDK-Torch的训练任务步骤如下:

1. 定义Batch

将本次训练需要的所有特征整合为一个Batch类。并实现to()、pin_memory()、record_stream()方法。完整代码参考dataset.py文件。

@dataclass
class Batch(Pipelineable):
    ...

2. 定义Dataset

实现一个返回Batch的Dataset。完整代码参考dataset.py文件。

class RandomRecDataset(IterableDataset[Batch]):
    ...

3. 初始化分布式变量

创建host和device侧的链接。完整代码参考main.py文件。

......
dist.init_process_group(backend="hccl")
host_gp = dist.new_group(backend="gloo")
host_env = ShardingEnv(world_size=world_size, rank=rank, pg=host_gp)

4. 定义模型

将稀疏表部分和Dense部分整合为一个Module。该module的输入必须要上述环节中定义的Batch类。返回为模型的loss和输出。完整代码参考model.py。

class TestModel(torch.nn.Module):
    def __init__(self, table_names, feat_names, embed_dims, num_embeds):
        self.ebc = HashEmbeddingBagCollection(...)

    def forward(self, batch: Batch):
        ...
        return loss, result

5. 定义稀疏表的优化器

指定sparse侧的优化器

test_model = TestModel(...)

# Optimizer
embedding_optimizer = torch.optim.Adagrad
optimizer_kwargs = {"lr": 0.001, "eps": 0.1}
apply_optimizer_in_backward(
    embedding_optimizer,
    test_model.ebc.parameters(),
    optimizer_kwargs=optimizer_kwargs,
)

6. 对稀疏表做分表

创建sharder,并使用EmbeddingShardingPlanner创建分表计划,将模型、分表计划和sharder传入DistributedModelParallel中获得分布式模型。注意当前支持row-wise和fused模式。完整代码参考main.py。

    hybrid_sharder = get_default_hybrid_sharders(host_env)
    constraints = {...}
    planner = EmbeddingShardingPlanner(...)
    plan = planner.collective_plan(...)
    logging.info(plan)
    ddp_model = DistributedModelParallel(
        test_model, device=torch.device("npu"), plan=plan, sharders=hybrid_sharder
    )

7. 整合优化器

分离dense和sparse的参数,并组合成一个新的优化器。完整代码参考main.py。

    dense_optimizer = KeyedOptimizerWrapper(
        dict(in_backward_optimizer_filter(ddp_model.named_parameters())),
        lambda params: torch.optim.Adagrad(params, lr=0.1),
    )
    optimizer = CombinedOptimizer([ddp_model.fused_optimizer, dense_optimizer])

8. 创建pipeline

完整代码参考main.py

pipeline = HybridTrainPipelineSparseDist(
    ddp_model, optimizer, device, execute_all_batches=True
)

9. 使用pipeline进行训练

完整代码参考main.py

batched_iterator = iter(data_loader)
for i in range(10):
    pipeline.progress(batched_iterator)

运行脚本

单卡运行

WORLD_SIZE=1 RANK=0 python main.py

多卡运行

运行脚本启动训练:

bash bash.sh

成功后出现demo done字样。