NCCL代码阅读-06

RunWorkColl
- 书接上文。RunWorkBatch走到了这里:
1
RunWorkColl<Fn, T, RedOp, Algo, Proto>().run(tid, subtn, work)
- 其中work是一个ncclDevWorkColl
- 然后按照模版实例化的参数(这边以LL128为例),进入了这个函数:
1
2
3
4
5
6template<typename T, typename RedOp>
struct RunWorkColl<ncclFuncAllReduce, T, RedOp, NCCL_ALGO_RING, NCCL_PROTO_LL128> {
__device__ __forceinline__ void run(int tid, int nthreads, struct ncclDevWorkColl* work) {
runRing<T, RedOp, ProtoLL128>(tid, nthreads, work);
}
};
runRing
1 | template<typename T, typename RedOp, typename Proto> |
- 这个函数里面东西比较多,我们一点点拿出来看
- 下面解释ncclCollCdbPart,prims的初始化,directSend
ncclCollCbdPart
1 | template<typename Int> |
primitives里面有哪些东西
类内变量
1 | static constexpr int MaxRecv = Fan::MaxRecv, MaxSend = Fan::MaxSend;// 由传递过来的Fan决定 |
prims的初始化
- 在runRing里面,对prims进行了初始化:
1
2Primitives<T, RedOp, FanSymmetric<1>, 1, Proto, 0> prims
(tid, nthreads, &ring->prev, &ring->next, work->sendbuff, work->recvbuff, work->redOpArg, 0, 0, 0, work); - 寻找到对应的构造函数,在prims_ll128里:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40__device__ Primitives(
const int tid, const int nthreads, int const *recvPeers, int const *sendPeers,
void const *inputBuf, void *outputBuf, uint64_t redOpArg, uint8_t group=0,
uint8_t connIndexRecv=0, uint8_t connIndexSend=0, struct ncclDevWorkColl* e = nullptr,
bool ipcReg = false, bool netReg = false, int stepSize_ = 0
):
redOp(redOpArg),
// wid是在当前warp中线程的编号
// warp是当前是第几个warp
tid(tid), nthreads(nthreads), wid(tid%WARP_SIZE), warp(tid/WARP_SIZE),
// warpInBlock是当前线程在当前线程块内属于第几个Warp
warpInBlock(threadIdx.x/WARP_SIZE),
flagThread((tid%8)==7), group(group),
// 步长是多少个元素
stepSize(ncclShmem.comm.buffSizes[NCCL_PROTO_LL128]/NCCL_STEPS/sizeof(uint64_t)) {
auto *channel = &ncclShmem.channel;
// 最初初始化时使用的是对称的通信模式(FanSymmetric),即接收和发送的对端节点数量是相等的
// nrecv和nsend就是实际接收和发送的对端节点数量,MaxRecv是在初始化的时候FanSymmetric<1>指定的
int nrecv=0, nsend=0;
while (nrecv < MaxRecv && recvPeers[nrecv] >= 0) {
loadRecvConn(&channel->peers[recvPeers[nrecv]]->recv[connIndexRecv], nrecv);
nrecv++;
}
while (nsend < MaxSend && sendPeers[nsend] >= 0) {
loadSendConn(&channel->peers[sendPeers[nsend]]->send[connIndexSend], nsend);
nsend++;
}
this->fan = Fan(nrecv, nsend);
// Coverity reports recvConn and sendConn being possibly NULL at this point but that won't actually
// happen given the two "while" loops just above.
// coverity[var_deref_model:FALSE]
loadRecvSync();
// coverity[var_deref_model:FALSE]
loadSendSync();
// inputBuf 和 outputBuf 就是最初nccl-test里面传进来的sendbuff和recvbuff的地址
// sendbuff是填充了要发送的数据,recvbuff应该全0,在AllReduceInitData里可以看到
// 最后把地址填充到userBufs里面
setDataPtrs(inputBuf, outputBuf);
} - 这里大概是一个怎样的结构呢?我梳理了一下:
- 我们假设有A,B,C三个节点
- A和B,A和C通信使用的是同一个channel,我这边觉得翻译成频段是更有助于理解的
- channel(实际类里面应该是ncclDevChannel)里面有一个ncclDevChannelPeer的指针数组peers,也就是说,channel.peers[0]是指向一个ncclDevChannelPeer(比如B节点)的指针,channel.peers[1]是指向另一个ncclDevChannelPeer(比如C节点)的指针
- recvPeers指示了peers的下标,如A的recvPeers为[0,1,-1],0就是B节点,1就是C节点
- nrecv指当前节点当前channel的接收连接总数,loadRecvConn里面也把他作为接收连接的下标
loadRecvConn
1 | __device__ __forceinline__ void loadRecvConn(struct ncclConnInfo* conn, int i) { |
- ncclConnInfo里面,有一个buffs数组,里面存放着指向不同原语协议的buffer的指针
- recvBuff存放这个节点这个channel连接的所有接收连接的buff地址
- 只在wid等于该接收连接下标的warp里执行
prims.directSend
- 在中间的几个步骤里,执行了
1
prims.directSend(offset, offset, nelem);
- 一路往下找,在primitives里面:
1
2
3
4
5
6
7template<typename RealPrimitives>
struct PrimitivesWithoutDirect {
__device__ void directSend(intptr_t inpIx, intptr_t outIx, int eltN) {
static_cast<RealPrimitives*>(this)->send(inpIx, eltN);
}
//...
} - 在prims_ll128里面:
1
2
3__device__ void send(intptr_t inpIx, int eltN) {
return GenericOp<0, 1, Input, -1>(inpIx, -1, eltN, false);
}
GenericOp
1 | template <int RECV, int SEND, int SrcBuf, int DstBuf> |
生产者-消费者模型在 NCCL 的实现
在 NCCL(NVIDIA Collective Communication Library)中,生产者-消费者模型用于管理点对点(P2P)通信。生产者-消费者模型的基本思想是,生产者(发送方)将数据写入缓冲区,消费者(接收方)从缓冲区读取数据,并通过步长指针(head
和 tail
)来协调通信双方的进度。
在 NCCL 中,这种模型通过环形缓冲区(Ring Buffer)来实现,用于同步发送方和接收方之间的通信状态。
1. 环形缓冲区的基本概念
缓冲区的组成
- NCCL 的环形缓冲区由共享内存或设备内存实现,逻辑上由
NCCL_STEPS
个步长块(step size blocks)组成。 - 每个步长块的大小为
stepSize
,计算公式:1
stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / NCCL_STEPS;
- 缓冲区的总大小:
1
总大小 = stepSize × NCCL_STEPS
关键指针
**
head
**:- 由 发送方 维护。
- 表示当前发送方已经发送到的位置。
- 发送方通过更新
head
通知接收方新数据已写入缓冲区。
**
tail
**:- 由 接收方 维护。
- 表示当前接收方已经读取到的位置。
- 接收方通过更新
tail
通知发送方哪些数据已经被处理,可以重新覆盖。
流控机制
- 发送方约束:只有当
(head - tail) < 缓冲区大小
时,发送方才可以继续写入数据。 - 接收方约束:只有当
head > tail
时,接收方才可以读取数据。
2. 生产者-消费者模型的实现细节
(1) 发送方(生产者)的行为
在发送方代码中,p2pSendSetup
和 p2pSendConnect
函数负责设置发送方的连接和缓冲区。以下是发送方的生产行为的具体实现:
1 | send->conn.head = &resources->sendDevMem->head; // 发送方的头指针 |
**发送方的
head
**:- 发送方使用
head
来记录当前已经写入的数据步长。 - 每次发送数据后,发送方更新
head
,通知接收方新数据已经可用。
- 发送方使用
**接收方的
tail
**:- 发送方通过读取
tail
知道接收方已经处理了哪些数据。 - 如果
(head - tail) >= 缓冲区大小
,发送方需要等待,直到接收方更新tail
。
- 发送方通过读取
(2) 接收方(消费者)的行为
在接收方代码中,p2pRecvSetup
和 p2pRecvConnect
函数负责设置接收方的连接和缓冲区。以下是接收方的消费行为的具体实现:
1 | recv->conn.head = &remDevMem->head; // 发送方的头指针 |
**接收方的
head
**:- 接收方通过读取发送方的
head
知道有哪些新数据已经写入缓冲区。 - 只有当
head > tail
时,接收方才可以读取数据。
- 接收方通过读取发送方的
**接收方的
tail
**:- 接收方使用
tail
来记录自己已经读取并处理到的步长。 - 每次处理完数据后,接收方更新
tail
,通知发送方可以覆盖已处理的数据。
- 接收方使用
3. 流控实现
发送方流控
发送方在生产数据时,需要确保缓冲区有足够的可用空间。流控逻辑如下:
- 通过
send->conn.tail
(接收方的tail
)获取接收方的处理进度。 - 确保
(head - tail) < 缓冲区大小
,否则需要等待接收方释放空间。 - 写入数据后,更新
send->conn.head
(发送方的head
)。
接收方流控
接收方在消费数据时,需要确保缓冲区中有未处理的数据。流控逻辑如下:
- 通过
recv->conn.head
(发送方的head
)获取发送方的写入进度。 - 确保
head > tail
,否则需要等待发送方写入新数据。 - 读取数据后,更新
recv->conn.tail
(接收方的tail
)。
4. 模式选择
直接通信模式
1 | if (P2P_SAME_PID(myInfo, peerInfo) && ncclParamP2pDirectDisable() == 0 && useMemcpy == 0) { |
- 如果发送方和接收方位于同一个进程,且支持直接指针访问,则使用直接通信模式(
P2P_DIRECT
)。 - 直接模式下,缓冲区可以直接通过 GPU 的设备内存共享。
IPC(进程间通信)模式
1 | resources->type = P2P_IPC; |
- 如果发送方和接收方位于不同进程,则使用 CUDA IPC 机制。
- IPC 模式通过共享缓冲区的 CUDA IPC 句柄实现进程间通信。
Memcpy 模式
1 | if (useMemcpy) { |
- 如果硬件或架构不支持直接指针访问,则使用
memcpy
模式。 - 在此模式下,通信通过共享内存区域完成,代理线程负责复制数据。
5. 代码中的步长管理
步长对齐
1 | ALIGN_SIZE(sendSize, CUDA_IPC_MIN); |
- 发送和接收的缓冲区大小会被对齐到最小 IPC 内存块大小(
CUDA_IPC_MIN
)。 - 这样可以确保缓冲区满足 CUDA IPC 和直接通信的硬件要求。
分步传输
1 | send->conn.stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / NCCL_STEPS; |
- 每次传输的步长为
stepSize
,是缓冲区大小的一个子块。 stepSize
确保数据可以分块传输,便于缓冲区的管理和流控。
6. 总结
生产者(发送方):
- 使用
head
来记录写入进度。 - 在写入数据之前检查
tail
,确保缓冲区有足够的空间。 - 写入数据后更新
head
,通知接收方。
- 使用
消费者(接收方):
- 使用
tail
来记录读取进度。 - 在读取数据之前检查
head
,确保缓冲区中有数据可读。 - 读取数据后更新
tail
,通知发送方可以覆盖已读取的数据。
- 使用
模式选择:
- 根据发送方和接收方的关系(同进程、跨进程)选择不同的通信模式:
- P2P_DIRECT:直接指针访问。
- P2P_IPC:CUDA IPC。
- Memcpy:通过共享内存和代理线程完成数据传输。
- 根据发送方和接收方的关系(同进程、跨进程)选择不同的通信模式:
通过这些机制,NCCL 实现了一个高效的生产者-消费者模型,用于管理点对点通信中的数据传输和同步。
1 | __device__ __forceinline__ void directSend(intptr_t inpIx, intptr_t outIx, int eltN) { |