NCCL代码阅读-06

Alice Yu Lv3

RunWorkColl

  • 书接上文。RunWorkBatch走到了这里:
    1
    RunWorkColl<Fn, T, RedOp, Algo, Proto>().run(tid, subtn, work)
  • 其中work是一个ncclDevWorkColl
  • 然后按照模版实例化的参数(这边以LL128为例),进入了这个函数:
    1
    2
    3
    4
    5
    6
    template<typename T, typename RedOp>
    struct RunWorkColl<ncclFuncAllReduce, T, RedOp, NCCL_ALGO_RING, NCCL_PROTO_LL128> {
    __device__ __forceinline__ void run(int tid, int nthreads, struct ncclDevWorkColl* work) {
    runRing<T, RedOp, ProtoLL128>(tid, nthreads, work);
    }
    };

runRing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
template<typename T, typename RedOp, typename Proto>
__device__ __forceinline__ void runRing(int tid, int nthreads, struct ncclDevWorkColl* work) {
ncclRing *ring = &ncclShmem.channel.ring;
int ringIx = ring->index;
const int nranks = ncclShmem.comm.nRanks;
ssize_t gridOffset;
ssize_t channelCount;
ssize_t chunkCount;
// gridOffset:
// channelCount:当前通道要负责的总的元素数量
// chunkCount: 当前通道中一个chunk容纳的元素数量
ncclCollCbdPart(work, ncclShmem.channelId, Proto::Id, sizeof(T), (ssize_t*)nullptr, &gridOffset, &channelCount, &chunkCount);
const ssize_t loopCount = nranks * chunkCount;
ssize_t offset;
int nelem;
int chunk;

// Coverity reports that the callee treats &ring->next as an array. However, due to the use of
// FanSymmetric<1>, only the first element is ever accessed, so it's fine.
// coverity[callee_ptr_arith:FALSE]
Primitives<T, RedOp, FanSymmetric<1>, 1, Proto, 0> prims
(tid, nthreads, &ring->prev, &ring->next, work->sendbuff, work->recvbuff, work->redOpArg, 0, 0, 0, work);


for (ssize_t elemOffset = 0; elemOffset < channelCount; elemOffset += loopCount) {
ssize_t remCount = channelCount - elemOffset;
ssize_t chunkOffset;

if (remCount < loopCount) chunkCount = alignUp(divUp(remCount, nranks), 16/sizeof(T));

auto modRanks = [&]__device__(int r)->int {
return r - (r >= nranks ? nranks : 0);
};

// step 0: push data to next GPU
chunk = modRanks(ringIx + nranks - 1);
chunkOffset = chunk * chunkCount;
offset = gridOffset + elemOffset + chunkOffset;
nelem = (int)min(chunkCount, remCount - chunkOffset);
prims.directSend(offset, offset, nelem);

// k-2 steps: reduce and copy to next GPU
for (int j = 2; j < nranks; ++j) {
chunk = modRanks(ringIx + nranks - j);
chunkOffset = chunk * chunkCount;
offset = gridOffset + elemOffset + chunkOffset;
nelem = (int)min(chunkCount, remCount - chunkOffset);
prims.directRecvReduceDirectSend(offset, offset, nelem);
}

// step k-1: reduce this buffer and data, which will produce the final
// result that we store in this data and push to the next GPU
chunk = ringIx + 0;
chunkOffset = chunk * chunkCount;
offset = gridOffset + elemOffset + chunkOffset;
nelem = (int)min(chunkCount, remCount - chunkOffset);
prims.directRecvReduceCopyDirectSend(offset, offset, nelem, /*postOp=*/true);

// k-2 steps: copy to next GPU
for (int j = 1; j < nranks - 1; ++j) {
chunk = modRanks(ringIx + nranks - j);
chunkOffset = chunk * chunkCount;
offset = gridOffset + elemOffset + chunkOffset;
nelem = (int)min(chunkCount, remCount - chunkOffset);
prims.directRecvCopyDirectSend(offset, nelem);
}

// Make final copy from buffer to dest.
chunk = modRanks(ringIx + 1);
chunkOffset = chunk * chunkCount;
offset = gridOffset + elemOffset + chunkOffset;
nelem = (int)min(chunkCount, remCount - chunkOffset);

prims.directRecv(offset, offset, nelem);
}
}
  • 这个函数里面东西比较多,我们一点点拿出来看
  • 下面解释ncclCollCdbPart,prims的初始化,directSend

ncclCollCbdPart

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
template<typename Int>
__host__ __device__ inline void ncclCollCbdPart(
struct ncclDevWorkColl* work, uint32_t channelId, int proto, int eltSize,
Int* count, Int* partOffset, Int* partCount, Int* chunkCount
) {
/////////////////////////////////////////////
// proto: LL128
// eltSize: sizeof(uint64_t)
// count: 我们的例子是nullptr
/////////////////////////////////////////////

// 一个grain有几个数据元素
int eltPerGrain = ncclProtoGrainSize(proto)/eltSize;
int nMidChannels = work->channelHi - work->channelLo - 1;
// We can assum that nMidChannels<0 implies countMid==0, which let's us assume
// that countMid*nMidChannels == 0.
if (count != nullptr) {
*count = work->cbd.countLo + work->cbd.countMid*nMidChannels + work->cbd.countHi;
}
if (channelId == work->channelLo) {
*partOffset = 0;
// countLo是当前通道要负责的总的元素数量
*partCount = work->cbd.countLo;
// chunkGrainsLo: 低通道上的一个chunk可以容纳几个grain
// chunkCount: 当前通道上一个chunk里面的元素数量
*chunkCount = work->cbd.chunkGrainsLo*eltPerGrain;
} else if (channelId == work->channelHi) {
*partOffset = work->cbd.countLo + nMidChannels*work->cbd.countMid;
*partCount = work->cbd.countHi;
*chunkCount = work->cbd.chunkGrainsHi*eltPerGrain;
} else {
int mid = channelId - work->channelLo - 1;
*partOffset = work->cbd.countLo + mid*work->cbd.countMid;
*partCount = work->cbd.countMid;
*chunkCount = work->cbd.chunkGrainsMid*eltPerGrain;
}
}

primitives里面有哪些东西

类内变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static constexpr int MaxRecv = Fan::MaxRecv, MaxSend = Fan::MaxSend;// 由传递过来的Fan决定
static constexpr int Input=0, Output=1;
RedOp redOp;
const int tid; // thread index in primitives group
const int nthreads; // 调用prims的函数里面可以看到,这是执行这个work需要的总线程数(work里面确定了)
const int wid; // 在当前warp中线程的编号
const int stepSize;
const int warp; // 当前是第几个warp
const int warpInBlock; // 在这个线程块里面,这是第几个warp
const bool flagThread;
const int group;
Fan fan;
T *userBufs[2];// 两个指针,指向的类型是T
struct ncclConnInfo* recvConn = NULL;//在构造函数里填充
volatile uint64_t* recvConnHeadPtr = NULL;
uint64_t recvConnHead;

struct ncclConnInfo* sendConn = NULL;//在构造函数里填充
volatile struct ncclConnFifo* sendConnFifo = NULL;
volatile uint64_t* sendConnTailPtr = NULL;
uint64_t sendConnTail;
volatile uint64_t* sendConnHeadPtr = NULL;
uint64_t sendConnHead;
uint64_t sendConnHeadCache; // Cache last seen value

uint64_t recvStep[MaxRecv];//在构造函数里填充
uint64_t sendStep[MaxSend];//在构造函数里填充
uint64_t* recvBuff[MaxRecv];// 存接收缓冲区地址的数组,在构造函数里填充
uint64_t* sendBuff[MaxSend];// 存发送缓冲区地址的数组,在构造函数里填充

prims的初始化

  • 在runRing里面,对prims进行了初始化:
    1
    2
    Primitives<T, RedOp, FanSymmetric<1>, 1, Proto, 0> prims
    (tid, nthreads, &ring->prev, &ring->next, work->sendbuff, work->recvbuff, work->redOpArg, 0, 0, 0, work);
  • 寻找到对应的构造函数,在prims_ll128里:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    __device__ Primitives(
    const int tid, const int nthreads, int const *recvPeers, int const *sendPeers,
    void const *inputBuf, void *outputBuf, uint64_t redOpArg, uint8_t group=0,
    uint8_t connIndexRecv=0, uint8_t connIndexSend=0, struct ncclDevWorkColl* e = nullptr,
    bool ipcReg = false, bool netReg = false, int stepSize_ = 0
    ):
    redOp(redOpArg),
    // wid是在当前warp中线程的编号
    // warp是当前是第几个warp
    tid(tid), nthreads(nthreads), wid(tid%WARP_SIZE), warp(tid/WARP_SIZE),
    // warpInBlock是当前线程在当前线程块内属于第几个Warp
    warpInBlock(threadIdx.x/WARP_SIZE),
    flagThread((tid%8)==7), group(group),
    // 步长是多少个元素
    stepSize(ncclShmem.comm.buffSizes[NCCL_PROTO_LL128]/NCCL_STEPS/sizeof(uint64_t)) {
    auto *channel = &ncclShmem.channel;
    // 最初初始化时使用的是对称的通信模式(FanSymmetric),即接收和发送的对端节点数量是相等的
    // nrecv和nsend就是实际接收和发送的对端节点数量,MaxRecv是在初始化的时候FanSymmetric<1>指定的
    int nrecv=0, nsend=0;
    while (nrecv < MaxRecv && recvPeers[nrecv] >= 0) {
    loadRecvConn(&channel->peers[recvPeers[nrecv]]->recv[connIndexRecv], nrecv);
    nrecv++;
    }
    while (nsend < MaxSend && sendPeers[nsend] >= 0) {
    loadSendConn(&channel->peers[sendPeers[nsend]]->send[connIndexSend], nsend);
    nsend++;
    }
    this->fan = Fan(nrecv, nsend);
    // Coverity reports recvConn and sendConn being possibly NULL at this point but that won't actually
    // happen given the two "while" loops just above.
    // coverity[var_deref_model:FALSE]
    loadRecvSync();
    // coverity[var_deref_model:FALSE]
    loadSendSync();
    // inputBuf 和 outputBuf 就是最初nccl-test里面传进来的sendbuff和recvbuff的地址
    // sendbuff是填充了要发送的数据,recvbuff应该全0,在AllReduceInitData里可以看到
    // 最后把地址填充到userBufs里面
    setDataPtrs(inputBuf, outputBuf);
    }

  • 这里大概是一个怎样的结构呢?我梳理了一下:
    • 我们假设有A,B,C三个节点
    • A和B,A和C通信使用的是同一个channel,我这边觉得翻译成频段是更有助于理解的
    • channel(实际类里面应该是ncclDevChannel)里面有一个ncclDevChannelPeer的指针数组peers,也就是说,channel.peers[0]是指向一个ncclDevChannelPeer(比如B节点)的指针,channel.peers[1]是指向另一个ncclDevChannelPeer(比如C节点)的指针
    • recvPeers指示了peers的下标,如A的recvPeers为[0,1,-1],0就是B节点,1就是C节点
    • nrecv指当前节点当前channel的接收连接总数,loadRecvConn里面也把他作为接收连接的下标

loadRecvConn

1
2
3
4
5
6
__device__ __forceinline__ void loadRecvConn(struct ncclConnInfo* conn, int i) {
// 这里conn->buffs的值,即接收缓冲区(以及发送缓冲区)的地址,是在p2p.cc里面,初始化的时候就填写好的,接收缓冲区的地址是
recvBuff[i] = (union ncclLLFifoLine*)conn->buffs[NCCL_PROTO_LL];
recvStep[i] = conn->step;
if (wid == i) recvConn = conn;
}
  • ncclConnInfo里面,有一个buffs数组,里面存放着指向不同原语协议的buffer的指针
  • recvBuff存放这个节点这个channel连接的所有接收连接的buff地址
  • 只在wid等于该接收连接下标的warp里执行

prims.directSend

  • 在中间的几个步骤里,执行了
    1
    prims.directSend(offset, offset, nelem);
  • 一路往下找,在primitives里面:
    1
    2
    3
    4
    5
    6
    7
    template<typename RealPrimitives>
    struct PrimitivesWithoutDirect {
    __device__ void directSend(intptr_t inpIx, intptr_t outIx, int eltN) {
    static_cast<RealPrimitives*>(this)->send(inpIx, eltN);
    }
    //...
    }
  • 在prims_ll128里面:
    1
    2
    3
    __device__ void send(intptr_t inpIx, int eltN) {
    return GenericOp<0, 1, Input, -1>(inpIx, -1, eltN, false);
    }

GenericOp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
template <int RECV, int SEND, int SrcBuf, int DstBuf>
__device__ __forceinline__ void GenericOp(intptr_t srcIx, intptr_t dstIx, int nelem, bool postOp) {
//////////////////////////////////////////
// RECV=0, SEND=1, SrcBuf=Input=0, DstBuf=-1
// srcIx=inpIx(offset), dstIx=-1, nelem=eltN(一个chunk的元素数), postOp=false
//////////////////////////////////////////
constexpr int SRC = SrcBuf != -1 ? 1 : 0;//1
constexpr int DST = DstBuf != -1 ? 1 : 0;//0
// 指向nccl-test一开始就分配好的sendbuff
T const *srcPtr = SrcBuf == -1 ? nullptr : userBufs[SrcBuf] + srcIx;
// 指向nccl-test一开始就分配好的recvbuff
T *dstPtr = DstBuf == -1 ? nullptr : userBufs[DstBuf] + dstIx;
// wire可以理解成逻辑上的系统总线,slice是最后传输的单位
int wireOffset = WireWordPerSlice*warp + 2*wid;
const int nwarps = nthreads/WARP_SIZE;
nelem = nelem < 0 ? 0 : nelem;

// 控制路径,是否时间很长?
if (SEND) waitSend(divUp(nelem, DataEltPerSlice)*WireWordPerSlice*sizeof(uint64_t));
barrier();
nelem -= DataEltPerSlice*warp;
srcPtr += DataEltPerSlice*warp;
dstPtr += DataEltPerSlice*warp;
while (nelem > 0) {
const int eltInSlice = min(nelem, DataEltPerSlice);
uint64_t regs[NCCL_LL128_SHMEM_ELEMS_PER_THREAD];
if (SRC) loadRegsBegin(regs, srcPtr, eltInSlice);
recvReduceSendCopy<NCCL_LL128_SHMEM_ELEMS_PER_THREAD, RECV, SEND, SrcBuf, DstBuf>(regs, wireOffset, postOp);
if (DST) storeRegs(dstPtr, regs, eltInSlice);

wireOffset += WireWordPerSlice*nwarps;
srcPtr += DataEltPerSlice*nwarps;
dstPtr += DataEltPerSlice*nwarps;
nelem -= DataEltPerSlice*nwarps;
}

barrier();
if (SEND) for (int i=0; i < MaxSend; i++) sendStep[i] += 1;
if (SEND) postSend();
if (RECV) for (int i=0; i < MaxRecv; i++) recvStep[i] += 1;
if (RECV) postRecv();
}

生产者-消费者模型在 NCCL 的实现

在 NCCL(NVIDIA Collective Communication Library)中,生产者-消费者模型用于管理点对点(P2P)通信。生产者-消费者模型的基本思想是,生产者(发送方)将数据写入缓冲区,消费者(接收方)从缓冲区读取数据,并通过步长指针(headtail)来协调通信双方的进度。

在 NCCL 中,这种模型通过环形缓冲区(Ring Buffer)来实现,用于同步发送方和接收方之间的通信状态。


1. 环形缓冲区的基本概念

缓冲区的组成

  • NCCL 的环形缓冲区由共享内存或设备内存实现,逻辑上由 NCCL_STEPS 个步长块(step size blocks)组成。
  • 每个步长块的大小为 stepSize,计算公式:
    1
    stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / NCCL_STEPS;
  • 缓冲区的总大小:
    1
    总大小 = stepSize × NCCL_STEPS

关键指针

  1. **head**:

    • 发送方 维护。
    • 表示当前发送方已经发送到的位置。
    • 发送方通过更新 head 通知接收方新数据已写入缓冲区。
  2. **tail**:

    • 接收方 维护。
    • 表示当前接收方已经读取到的位置。
    • 接收方通过更新 tail 通知发送方哪些数据已经被处理,可以重新覆盖。

流控机制

  • 发送方约束:只有当 (head - tail) < 缓冲区大小 时,发送方才可以继续写入数据。
  • 接收方约束:只有当 head > tail 时,接收方才可以读取数据。

2. 生产者-消费者模型的实现细节

(1) 发送方(生产者)的行为

在发送方代码中,p2pSendSetupp2pSendConnect 函数负责设置发送方的连接和缓冲区。以下是发送方的生产行为的具体实现:

1
2
send->conn.head = &resources->sendDevMem->head; // 发送方的头指针
send->conn.tail = &remDevMem->tail; // 接收方的尾指针
  • **发送方的 head**:

    • 发送方使用 head 来记录当前已经写入的数据步长。
    • 每次发送数据后,发送方更新 head,通知接收方新数据已经可用。
  • **接收方的 tail**:

    • 发送方通过读取 tail 知道接收方已经处理了哪些数据。
    • 如果 (head - tail) >= 缓冲区大小,发送方需要等待,直到接收方更新 tail

(2) 接收方(消费者)的行为

在接收方代码中,p2pRecvSetupp2pRecvConnect 函数负责设置接收方的连接和缓冲区。以下是接收方的消费行为的具体实现:

1
2
recv->conn.head = &remDevMem->head;           // 发送方的头指针
recv->conn.tail = &resources->recvDevMem->tail; // 接收方的尾指针
  • **接收方的 head**:

    • 接收方通过读取发送方的 head 知道有哪些新数据已经写入缓冲区。
    • 只有当 head > tail 时,接收方才可以读取数据。
  • **接收方的 tail**:

    • 接收方使用 tail 来记录自己已经读取并处理到的步长。
    • 每次处理完数据后,接收方更新 tail,通知发送方可以覆盖已处理的数据。

3. 流控实现

发送方流控

发送方在生产数据时,需要确保缓冲区有足够的可用空间。流控逻辑如下:

  1. 通过 send->conn.tail(接收方的 tail)获取接收方的处理进度。
  2. 确保 (head - tail) < 缓冲区大小,否则需要等待接收方释放空间。
  3. 写入数据后,更新 send->conn.head(发送方的 head)。

接收方流控

接收方在消费数据时,需要确保缓冲区中有未处理的数据。流控逻辑如下:

  1. 通过 recv->conn.head(发送方的 head)获取发送方的写入进度。
  2. 确保 head > tail,否则需要等待发送方写入新数据。
  3. 读取数据后,更新 recv->conn.tail(接收方的 tail)。

4. 模式选择

直接通信模式

1
2
3
4
if (P2P_SAME_PID(myInfo, peerInfo) && ncclParamP2pDirectDisable() == 0 && useMemcpy == 0) {
resources->type = P2P_DIRECT;
send->conn.flags |= info->read ? NCCL_DIRECT_READ : NCCL_DIRECT_WRITE;
}
  • 如果发送方和接收方位于同一个进程,且支持直接指针访问,则使用直接通信模式(P2P_DIRECT)。
  • 直接模式下,缓冲区可以直接通过 GPU 的设备内存共享。

IPC(进程间通信)模式

1
2
resources->type = P2P_IPC;
recv->conn.flags |= info->read ? NCCL_IPC_READ : NCCL_IPC_WRITE;
  • 如果发送方和接收方位于不同进程,则使用 CUDA IPC 机制。
  • IPC 模式通过共享缓冲区的 CUDA IPC 句柄实现进程间通信。

Memcpy 模式

1
2
3
4
if (useMemcpy) {
send->conn.tail = &resources->proxyInfo.ceRecvMem->tail;
send->conn.head = &resources->proxyInfo.devShm->sendMem.head;
}
  • 如果硬件或架构不支持直接指针访问,则使用 memcpy 模式。
  • 在此模式下,通信通过共享内存区域完成,代理线程负责复制数据。

5. 代码中的步长管理

步长对齐

1
2
ALIGN_SIZE(sendSize, CUDA_IPC_MIN);
ALIGN_SIZE(recvSize, CUDA_IPC_MIN);
  • 发送和接收的缓冲区大小会被对齐到最小 IPC 内存块大小(CUDA_IPC_MIN)。
  • 这样可以确保缓冲区满足 CUDA IPC 和直接通信的硬件要求。

分步传输

1
2
send->conn.stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / NCCL_STEPS;
recv->conn.stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / NCCL_STEPS;
  • 每次传输的步长为 stepSize,是缓冲区大小的一个子块。
  • stepSize 确保数据可以分块传输,便于缓冲区的管理和流控。

6. 总结

  1. 生产者(发送方)

    • 使用 head 来记录写入进度。
    • 在写入数据之前检查 tail,确保缓冲区有足够的空间。
    • 写入数据后更新 head,通知接收方。
  2. 消费者(接收方)

    • 使用 tail 来记录读取进度。
    • 在读取数据之前检查 head,确保缓冲区中有数据可读。
    • 读取数据后更新 tail,通知发送方可以覆盖已读取的数据。
  3. 模式选择

    • 根据发送方和接收方的关系(同进程、跨进程)选择不同的通信模式:
      • P2P_DIRECT:直接指针访问。
      • P2P_IPC:CUDA IPC。
      • Memcpy:通过共享内存和代理线程完成数据传输。

通过这些机制,NCCL 实现了一个高效的生产者-消费者模型,用于管理点对点通信中的数据传输和同步。

1
2
3
__device__ __forceinline__ void directSend(intptr_t inpIx, intptr_t outIx, int eltN) {
genericOp<0, 1, 0, 1, Input, -1>(inpIx, outIx, eltN, false);
}