前言
介绍accelerator.gather函数和accelerator.wait_for_everyone函数
Operating System: Ubuntu 22.04.4 LTS
accelerator.gather
accelerator.gather 是 Hugging Face 的 Accelerate 库中的一个实用函数,主要用于在分布式训练(如多 GPU 或多节点训练)中将张量(tensor)从所有进程收集到主进程中。这对于在训练过程中汇总结果、进行评估或记录日志非常有用。
📌 简介
在使用 Accelerator 进行分布式训练时,每个设备(比如每个 GPU)都会有自己的数据副本。当你想将这些分散在不同设备上的张量统一收集到一个地方(通常是主进程),就可以使用 accelerator.gather()。
🧠 作用
- 将多个设备上的张量合并成一个完整的张量。
- 支持自动处理不同设备上的张量(如 GPU 0, GPU 1 等)。
- 常用于:
- 收集模型预测结果
- 汇总 loss、accuracy 等指标
- 日志记录和可视化
✅ 使用示例
from accelerate import Accelerator
accelerator = Accelerator()
# 假设这是某个模型的输出,在每个设备上都有一个局部 batch
local_tensor = torch.tensor([1, 2, 3]).to(accelerator.device)
# 收集所有设备上的 tensor 到主进程
gathered_tensor = accelerator.gather(local_tensor)
if accelerator.is_main_process:
print("Gathered tensor:", gathered_tensor)
输出示例(假设两个 GPU):
Gathered tensor: tensor([1, 2, 3, 1, 2, 3])
🔍 参数说明
accelerator.gather(tensor, axis=0)
| 参数 | 类型 | 描述 |
|---|---|---|
tensor |
torch.Tensor |
要收集的本地张量 |
axis |
int |
在哪个维度上拼接,默认是 0 |
⚠️ 注意事项
gather不会自动平均或者减少(reduce),它只是把所有张量按axis维度拼接在一起。- 如果你想做的是求平均或求和,可以考虑使用
accelerator.reduce()。 - 只有主进程 (
is_main_process) 需要处理最终结果,避免重复计算或输出。
🧩 和其他方法的区别
| 方法名 | 功能 |
|---|---|
gather |
把所有进程的张量收集到主进程 |
reduce |
对所有进程的张量执行 reduce 操作(如 sum, mean) |
pad_across_processes |
补齐不同进程的数据长度以便 gather |
📘 示例:结合 Dataloader 使用
from torch.utils.data import DataLoader, TensorDataset
import torch
# 创建 dummy 数据
dataset = TensorDataset(torch.arange(8))
dataloader = DataLoader(dataset, batch_size=2)
# 使用 Accelerator 包装 dataloader
accelerator = Accelerator()
dataloader = accelerator.prepare(dataloader)
all_preds = []
for batch in dataloader:
# 假设模型前向传播的结果是一个 tensor
preds = batch[0] * 2 # 模拟预测
all_preds.append(accelerator.gather(preds))
if accelerator.is_main_process:
full_preds = torch.cat(all_preds)
print("Full predictions:", full_preds)
📚 相关文档
accelerator.wait_for_everyone
accelerator.wait_for_everyone() 是 Hugging Face 的 Accelerate 库中的一个实用函数,用于在分布式训练中同步所有进程。它确保所有设备(如多个 GPU 或多个节点)都执行到某个点后才继续运行后续代码。
🎯 作用
在分布式训练中,不同进程可能因为各种原因(比如计算速度不一致、数据加载时间差异等)导致进度不同步。
使用 wait_for_everyone() 可以让所有进程在此处等待彼此,防止出现“某些进程已经进入下一步而其他进程还没完成”的问题。
✅ 简单来说:
accelerator.wait_for_everyone()
这行代码会阻塞当前进程,直到所有其他参与训练的进程也执行到这里。
🧠 使用场景
- 模型保存前检查或预处理
- 某些操作需要所有进程先完成自己的任务。
- 日志记录、评估阶段
- 防止主进程提前进行汇总或写入日志。
- 多进程数据处理
- 在分片处理数据时,保证每个进程都完成了自己的部分。
- 调试时查看中间结果
- 避免因进程异步导致输出混乱。
✅ 示例代码
from accelerate import Accelerator
import torch
import time
accelerator = Accelerator()
if accelerator.is_main_process:
print("Main process starting...")
# 模拟各进程工作时间不同
rank = accelerator.process_index
time.sleep(rank) # rank=0 主进程最快,rank=1次之...
print(f"Process {rank} is working...")
# 同步所有进程
accelerator.wait_for_everyone()
print(f"Process {rank} continues after wait_for_everyone().")
输出示例(2个进程):
Main process starting...
Process 0 is working...
Process 1 is working...
Process 0 continues after wait_for_everyone().
Process 1 continues after wait_for_everyone().
可以看到,尽管进程 0 更快完成前面的任务,但它会在 wait_for_everyone() 处等待进程 1 完成。
🔍 内部原理简要说明
该方法底层调用了 PyTorch 分布式通信的 torch.distributed.barrier() 函数,这是一个标准的同步机制,用于协调所有分布式进程的执行顺序。
⚠️ 注意事项
- 一定要在所有进程中都调用这个函数,否则程序会卡住。
- 不要在条件语句中只对主进程调用它,例如下面这种写法是错误的:
if accelerator.is_main_process:
accelerator.wait_for_everyone() # ❌ 错误!其他进程不会执行,程序会卡死
📚 相关方法
| 方法 | 描述 |
|---|---|
accelerator.wait_for_everyone() |
所有进程在此处同步 |
accelerator.gather() |
收集张量到主进程 |
accelerator.reduce() |
对张量做跨进程归约(如求和、平均) |
📝 总结
accelerator.wait_for_everyone() 是一个非常有用的同步工具,在分布式训练中可以确保各个进程之间保持一致的执行状态。合理使用它可以避免很多由于异步执行带来的问题,特别是在数据处理、模型评估、日志记录等阶段。
如果你正在编写一个复杂的分布式训练流程,建议在关键步骤加入此函数以确保稳定性。
完整的分布式训练+同步的 demo
下面是一个 使用 Hugging Face Accelerate 的完整分布式训练 + 同步操作的示例。这个例子包含:
- 多 GPU 分布式训练(支持单机多卡)
- 使用
accelerator.gather()收集预测结果 - 使用
accelerator.wait_for_everyone()进行同步 - 使用
accelerator.prepare()自动处理模型和数据加载器
🧪 环境准备
确保你已安装以下库:
pip install accelerate torch transformers
然后你可以用如下命令启动分布式训练(例如:2 个 GPU):
accelerate launch --num_processes=2 train_demo.py
📄 示例代码:train_demo.py
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn import CrossEntropyLoss
from torch.optim import AdamW
from accelerate import Accelerator
from tqdm import tqdm
# =============================
# 1. 定义一个简单的 Dataset
# =============================
class DummyDataset(Dataset):
def __init__(self, num_samples=64, dim=10, num_classes=2):
self.inputs = torch.randn(num_samples, dim)
self.labels = torch.randint(0, num_classes, (num_samples,))
def __len__(self):
return len(self.inputs)
def __getitem__(self, idx):
return self.inputs[idx], self.labels[idx]
# =============================
# 2. 定义一个简单模型
# =============================
class SimpleModel(torch.nn.Module):
def __init__(self, input_dim=10, hidden_dim=32, output_dim=2):
super().__init__()
self.net = torch.nn.Sequential(
torch.nn.Linear(input_dim, hidden_dim),
torch.nn.ReLU(),
torch.nn.Linear(hidden_dim, output_dim)
)
def forward(self, x):
return self.net(x)
# =============================
# 3. 主训练函数
# =============================
def train():
# 初始化 Accelerator 实例
accelerator = Accelerator()
# 创建 dataset 和 dataloader
dataset = DummyDataset()
dataloader = DataLoader(dataset, batch_size=8)
# 模型、优化器、dataloader 会自动适配分布式设置
model = SimpleModel()
optimizer = AdamW(model.parameters(), lr=3e-4)
model, optimizer, dataloader = accelerator.prepare(model, optimizer, dataloader)
# 开始训练
model.train()
for epoch in range(3): # 训练 3 轮
all_preds = []
all_labels = []
progress_bar = tqdm(dataloader, desc=f"Epoch {epoch+1}", disable=not accelerator.is_local_main_process)
for inputs, labels in progress_bar:
outputs = model(inputs)
loss = CrossEntropyLoss()(outputs, labels)
# 反向传播
accelerator.backward(loss)
optimizer.step()
optimizer.zero_grad()
# 收集每个 batch 的预测结果
preds = torch.argmax(outputs, dim=-1)
gathered_preds = accelerator.gather(preds)
gathered_labels = accelerator.gather(labels)
# 保存到主进程中用于后续计算指标
if accelerator.is_main_process:
all_preds.append(gathered_preds)
all_labels.append(gathered_labels)
# 更新进度条
progress_bar.set_postfix(loss=loss.item())
# 所有进程同步,防止主进程提前进入下一轮或退出
accelerator.wait_for_everyone()
# 主进程进行评估/打印
if accelerator.is_main_process:
full_preds = torch.cat(all_preds)
full_labels = torch.cat(all_labels)
accuracy = (full_preds == full_labels).float().mean().item()
print(f"Epoch {epoch+1} - Accuracy: {accuracy:.4f}")
# 主进程保存模型
if accelerator.is_main_process:
accelerator.save_state(output_dir="./trained_model")
# =============================
# 4. 启动训练
# =============================
if __name__ == "__main__":
train()
📝 说明与解释
| 功能 | 说明 |
|---|---|
accelerator.prepare() |
自动包装模型、优化器、数据加载器以支持分布式训练 |
accelerator.backward(loss) |
替代 loss.backward(),兼容梯度同步 |
accelerator.gather() |
收集各个设备上的张量到主进程 |
accelerator.wait_for_everyone() |
所有进程在此处等待彼此完成当前任务,再继续 |
accelerator.is_main_process |
判断是否是主进程,避免重复输出或保存模型 |
accelerator.save_state() |
保存完整的训练状态(模型、优化器等) |
🚀 如何运行?
单机多卡(比如 2 个 GPU):
accelerate launch --num_processes=2 train_demo.py
如果你只有一张 GPU,也可以运行它,会自动退化为单卡模式。
✅ 输出示例(主进程):
Epoch 1 - Accuracy: 0.5469
Epoch 2 - Accuracy: 0.6250
Epoch 3 - Accuracy: 0.7188
📌 小结
这个 Demo 展示了如何在 Hugging Face Accelerate 下进行:
- 分布式训练
- 张量收集(
gather) - 进程同步(
wait_for_everyone) - 主进程控制输出与保存模型
结语
第三百三十四篇博文写完,开心!!!!
今天,也是充满希望的一天。