compress-dense
背景与挑战
当训练使能不同PP并行策略时,pp_stage=0的情况下,多个warmup阶段所保存的激活值会导致首节点与尾节点内存不均衡。
当存在减小内存的需求时,可以针对首节点dense层激活值进行压缩,以达到节省内存的目的。
解决方案
激活值在训练过程中属于高频使用的内存对象,前向产生反向释放,并且不同的并行策略会导致激活值的生命周期产生复杂的变化,因此需要合理并安全的设计可异步的无损压缩方案。通过使能compress-dense的特性,可以利用多流异步的方案实现对大量Dense-层激活值小代价的压缩,具体方法如下图所示:

使用方法
当需要对Dense层MLP产生的激活值进行重计算时候,可通过在训练参数中增加--compress-dense level1 或者 --compress-dense level0 来使能特性,其中:
--compress-dense level0只进行Vector压缩,节省空间较少,但是带来较小的性能开销。--compress-dense level1在进行Vector压缩的同时,将部分结果swap到CPU上,节省空间较多,但是带来较大的性能开销。
因此,建议在使用该特性时根据matmul的可掩盖时长来适当选择。
更一般的情况
事实上,针对任意激活值都可以使用我们的多流异步的方法进行压缩,以达到较小代价便可节省激活值内存的目的。前提是需要找到适合做多流掩盖的计算块,比如大颗粒的矩阵乘(针对tensor的压缩仅用到aicore中的vector计算单元),或者未掩盖的通信部分。并且需要存在重复多层的情况,以便进行层级错位的压缩和解压缩。我们的方法也被抽象为3个步骤,在代码合适的地方加入三个步骤即可:
#1.Create an activation value compression management object and prepare a function for asynchronous compression.
self.ac = ActivationCompress(*args)
self.ac.compress_and_wait_decompress_async_for_previous_layer(x)
#matmul operator
#2.Prepare the asynchronous decompression function.
self.ac.decompress_and_wait_compress_async_for_previous_layer(x)
#some operator
#3.Record the forward and reverse sequence
self.ac.order_record(out)
以下是详细的使用示例:
注:要使用激活值压缩特性,建议修改范围集中在某个forward内部, 这样可以尽量避免跨层耦合与绑定。
如下为一个简单模型的使用说明,假设我们希望对模型中第一个matmul产生的激活值进行无损压缩:
定义一个简单三层神经网络 SimpleModel,其中每一层由SimpleLayer构成,每个Layer为 Linear + relu + Linear 模式。可以通过简单的四个步骤,使能我们的多流异步激活值压缩。
第一步,在第一个Linear层之前,实例化一个ActivationCompress类并且调用compress_and_wait_decompress_async_for_previous_layer 函数,用于初始化compress tensor相关类以及开始压缩, 其中:
train_args:是一个训练中全局参数对象,如果是在Megatron训练框架下,直接传入get_args()即可。"simplelayer_ctm":是任意字符串,是对激活值压缩类的命名,推荐使用 "xx_ctm" 的模式。
第二步:在Liner层后,增加 decompress_and_wait_compress_async_for_previous_layer 函数,用于异步压缩的等待以及解压缩的触发, 接受的参数即为要压缩的对象(放在下一层压缩,因此不会影响当前层的计算)。
第三步:插入 order_record 函数,用于记录压缩顺序。
class SimpleLayer(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(SimpleLayer, self).__init__()
self.linear1 = nn.Linear(input_size, hidden_size)
self.linear2 = nn.Linear(hidden_size, output_size)
def forward(self, x):
#1.Create an activation value compression management object and prepare a function for asynchronous compression.
if not hasattr(self, "ac"):
self.ac = ActivationCompress(train_args, "simplelayer_ctm")
self.ac.compress_and_wait_decompress_async_for_previous_layer(x)
out = self.linear1(x)
#2.Prepare the asynchronous decompression function.
self.ac.decompress_and_wait_compress_async_for_previous_layer(out)
out = F.relu(out)
#3.Record the forward and reverse sequence
self.ac.order_record(out)
out = self.linear2(out)
return out
class SimpleModel(nn.Module):
def __init__(self, input_size=64, hidden_size=128, output_size=64):
super(SimpleModel, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.output_size = output_size
self.dense_layer = SimpleLayer(input_size, hidden_size, output_size)
self.model = nn.Sequential(self.dense_layer, self.dense_layer, self.dense_layer)
def forward(self, x):
out = self.model(x)
return out
可参考文件:MindSpeed/tests_extend/unit_tests/features/compress_dense/test_compress_tensor.py
注意事项
- 该特性匹配的
CANN和torch_npu版本会在2025年第二季度发布版本。 - 当前和
recompute_activation_function特性暂不兼容。