00334 accelerator.gather和accelerator.wait_for_everyone


前言

介绍accelerator.gather函数和accelerator.wait_for_everyone函数

Operating System: Ubuntu 22.04.4 LTS

accelerator.gather

accelerator.gather 是 Hugging Face 的 Accelerate 库中的一个实用函数,主要用于在分布式训练(如多 GPU 或多节点训练)中将张量(tensor)从所有进程收集到主进程中。这对于在训练过程中汇总结果、进行评估或记录日志非常有用。


📌 简介

在使用 Accelerator 进行分布式训练时,每个设备(比如每个 GPU)都会有自己的数据副本。当你想将这些分散在不同设备上的张量统一收集到一个地方(通常是主进程),就可以使用 accelerator.gather()


🧠 作用

  • 将多个设备上的张量合并成一个完整的张量。
  • 支持自动处理不同设备上的张量(如 GPU 0, GPU 1 等)。
  • 常用于:
    • 收集模型预测结果
    • 汇总 loss、accuracy 等指标
    • 日志记录和可视化

✅ 使用示例

from accelerate import Accelerator

accelerator = Accelerator()

# 假设这是某个模型的输出,在每个设备上都有一个局部 batch
local_tensor = torch.tensor([1, 2, 3]).to(accelerator.device)

# 收集所有设备上的 tensor 到主进程
gathered_tensor = accelerator.gather(local_tensor)

if accelerator.is_main_process:
    print("Gathered tensor:", gathered_tensor)

输出示例(假设两个 GPU):

Gathered tensor: tensor([1, 2, 3, 1, 2, 3])

🔍 参数说明

accelerator.gather(tensor, axis=0)
参数 类型 描述
tensor torch.Tensor 要收集的本地张量
axis int 在哪个维度上拼接,默认是 0

⚠️ 注意事项

  • gather 不会自动平均或者减少(reduce),它只是把所有张量按 axis 维度拼接在一起。
  • 如果你想做的是求平均求和,可以考虑使用 accelerator.reduce()
  • 只有主进程 (is_main_process) 需要处理最终结果,避免重复计算或输出。

🧩 和其他方法的区别

方法名 功能
gather 把所有进程的张量收集到主进程
reduce 对所有进程的张量执行 reduce 操作(如 sum, mean)
pad_across_processes 补齐不同进程的数据长度以便 gather

📘 示例:结合 Dataloader 使用

from torch.utils.data import DataLoader, TensorDataset
import torch

# 创建 dummy 数据
dataset = TensorDataset(torch.arange(8))
dataloader = DataLoader(dataset, batch_size=2)

# 使用 Accelerator 包装 dataloader
accelerator = Accelerator()
dataloader = accelerator.prepare(dataloader)

all_preds = []

for batch in dataloader:
    # 假设模型前向传播的结果是一个 tensor
    preds = batch[0] * 2  # 模拟预测
    all_preds.append(accelerator.gather(preds))

if accelerator.is_main_process:
    full_preds = torch.cat(all_preds)
    print("Full predictions:", full_preds)

📚 相关文档


accelerator.wait_for_everyone

accelerator.wait_for_everyone() 是 Hugging Face 的 Accelerate 库中的一个实用函数,用于在分布式训练中同步所有进程。它确保所有设备(如多个 GPU 或多个节点)都执行到某个点后才继续运行后续代码。


🎯 作用

在分布式训练中,不同进程可能因为各种原因(比如计算速度不一致、数据加载时间差异等)导致进度不同步。
使用 wait_for_everyone() 可以让所有进程在此处等待彼此,防止出现“某些进程已经进入下一步而其他进程还没完成”的问题。

✅ 简单来说:

accelerator.wait_for_everyone()

这行代码会阻塞当前进程,直到所有其他参与训练的进程也执行到这里。


🧠 使用场景

  1. 模型保存前检查或预处理
    • 某些操作需要所有进程先完成自己的任务。
  2. 日志记录、评估阶段
    • 防止主进程提前进行汇总或写入日志。
  3. 多进程数据处理
    • 在分片处理数据时,保证每个进程都完成了自己的部分。
  4. 调试时查看中间结果
    • 避免因进程异步导致输出混乱。

✅ 示例代码

from accelerate import Accelerator
import torch
import time

accelerator = Accelerator()

if accelerator.is_main_process:
    print("Main process starting...")

# 模拟各进程工作时间不同
rank = accelerator.process_index
time.sleep(rank)  # rank=0 主进程最快,rank=1次之...

print(f"Process {rank} is working...")

# 同步所有进程
accelerator.wait_for_everyone()

print(f"Process {rank} continues after wait_for_everyone().")

输出示例(2个进程):

Main process starting...
Process 0 is working...
Process 1 is working...
Process 0 continues after wait_for_everyone().
Process 1 continues after wait_for_everyone().

可以看到,尽管进程 0 更快完成前面的任务,但它会在 wait_for_everyone() 处等待进程 1 完成。


🔍 内部原理简要说明

该方法底层调用了 PyTorch 分布式通信的 torch.distributed.barrier() 函数,这是一个标准的同步机制,用于协调所有分布式进程的执行顺序。


⚠️ 注意事项

  • 一定要在所有进程中都调用这个函数,否则程序会卡住。
  • 不要在条件语句中只对主进程调用它,例如下面这种写法是错误的:
if accelerator.is_main_process:
    accelerator.wait_for_everyone()  # ❌ 错误!其他进程不会执行,程序会卡死

📚 相关方法

方法 描述
accelerator.wait_for_everyone() 所有进程在此处同步
accelerator.gather() 收集张量到主进程
accelerator.reduce() 对张量做跨进程归约(如求和、平均)

📝 总结

accelerator.wait_for_everyone() 是一个非常有用的同步工具,在分布式训练中可以确保各个进程之间保持一致的执行状态。合理使用它可以避免很多由于异步执行带来的问题,特别是在数据处理、模型评估、日志记录等阶段。

如果你正在编写一个复杂的分布式训练流程,建议在关键步骤加入此函数以确保稳定性。

完整的分布式训练+同步的 demo

下面是一个 使用 Hugging Face Accelerate 的完整分布式训练 + 同步操作的示例。这个例子包含:

  • 多 GPU 分布式训练(支持单机多卡)
  • 使用 accelerator.gather() 收集预测结果
  • 使用 accelerator.wait_for_everyone() 进行同步
  • 使用 accelerator.prepare() 自动处理模型和数据加载器

🧪 环境准备

确保你已安装以下库:

pip install accelerate torch transformers

然后你可以用如下命令启动分布式训练(例如:2 个 GPU):

accelerate launch --num_processes=2 train_demo.py

📄 示例代码:train_demo.py

import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn import CrossEntropyLoss
from torch.optim import AdamW
from accelerate import Accelerator
from tqdm import tqdm

# =============================
# 1. 定义一个简单的 Dataset
# =============================
class DummyDataset(Dataset):
    def __init__(self, num_samples=64, dim=10, num_classes=2):
        self.inputs = torch.randn(num_samples, dim)
        self.labels = torch.randint(0, num_classes, (num_samples,))

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        return self.inputs[idx], self.labels[idx]

# =============================
# 2. 定义一个简单模型
# =============================
class SimpleModel(torch.nn.Module):
    def __init__(self, input_dim=10, hidden_dim=32, output_dim=2):
        super().__init__()
        self.net = torch.nn.Sequential(
            torch.nn.Linear(input_dim, hidden_dim),
            torch.nn.ReLU(),
            torch.nn.Linear(hidden_dim, output_dim)
        )

    def forward(self, x):
        return self.net(x)

# =============================
# 3. 主训练函数
# =============================
def train():
    # 初始化 Accelerator 实例
    accelerator = Accelerator()

    # 创建 dataset 和 dataloader
    dataset = DummyDataset()
    dataloader = DataLoader(dataset, batch_size=8)

    # 模型、优化器、dataloader 会自动适配分布式设置
    model = SimpleModel()
    optimizer = AdamW(model.parameters(), lr=3e-4)
    model, optimizer, dataloader = accelerator.prepare(model, optimizer, dataloader)

    # 开始训练
    model.train()
    for epoch in range(3):  # 训练 3 轮
        all_preds = []
        all_labels = []

        progress_bar = tqdm(dataloader, desc=f"Epoch {epoch+1}", disable=not accelerator.is_local_main_process)
        for inputs, labels in progress_bar:
            outputs = model(inputs)
            loss = CrossEntropyLoss()(outputs, labels)

            # 反向传播
            accelerator.backward(loss)
            optimizer.step()
            optimizer.zero_grad()

            # 收集每个 batch 的预测结果
            preds = torch.argmax(outputs, dim=-1)
            gathered_preds = accelerator.gather(preds)
            gathered_labels = accelerator.gather(labels)

            # 保存到主进程中用于后续计算指标
            if accelerator.is_main_process:
                all_preds.append(gathered_preds)
                all_labels.append(gathered_labels)

            # 更新进度条
            progress_bar.set_postfix(loss=loss.item())

        # 所有进程同步,防止主进程提前进入下一轮或退出
        accelerator.wait_for_everyone()

        # 主进程进行评估/打印
        if accelerator.is_main_process:
            full_preds = torch.cat(all_preds)
            full_labels = torch.cat(all_labels)
            accuracy = (full_preds == full_labels).float().mean().item()
            print(f"Epoch {epoch+1} - Accuracy: {accuracy:.4f}")

    # 主进程保存模型
    if accelerator.is_main_process:
        accelerator.save_state(output_dir="./trained_model")

# =============================
# 4. 启动训练
# =============================
if __name__ == "__main__":
    train()

📝 说明与解释

功能 说明
accelerator.prepare() 自动包装模型、优化器、数据加载器以支持分布式训练
accelerator.backward(loss) 替代 loss.backward(),兼容梯度同步
accelerator.gather() 收集各个设备上的张量到主进程
accelerator.wait_for_everyone() 所有进程在此处等待彼此完成当前任务,再继续
accelerator.is_main_process 判断是否是主进程,避免重复输出或保存模型
accelerator.save_state() 保存完整的训练状态(模型、优化器等)

🚀 如何运行?

单机多卡(比如 2 个 GPU):

accelerate launch --num_processes=2 train_demo.py

如果你只有一张 GPU,也可以运行它,会自动退化为单卡模式。


✅ 输出示例(主进程):

Epoch 1 - Accuracy: 0.5469
Epoch 2 - Accuracy: 0.6250
Epoch 3 - Accuracy: 0.7188

📌 小结

这个 Demo 展示了如何在 Hugging Face Accelerate 下进行:

  • 分布式训练
  • 张量收集(gather
  • 进程同步(wait_for_everyone
  • 主进程控制输出与保存模型

结语

第三百三十四篇博文写完,开心!!!!

今天,也是充满希望的一天。


文章作者: LuYF-Lemon-love
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 LuYF-Lemon-love !
  目录