NCCL代码阅读-05

AllReduce操作流程(从ncclLaunchKernel开始)
- 因为我的项目基本上只用allreduce,所以我就重点关注了一下这个操作
- 具体来说我用的是allreduce的u32的sum操作
- ncclLaunchKernel前面的内容和《NCCL代码阅读-01》里面记录的sendrecv操作差不多,就不过多解释了
准备工作:launchKernel的前夜
- 这部分本来应该在01里面解释的,但是放在这里,更有助于理解kernel中的workbatch, task等概念
taskAppend
- 回顾一下,taskAppend函数会在ncclEnqueueCheck中被调用,而ncclEnqueueCheck只需要传入一个参数,叫info,这个info来自于ncclAllReduce(nccl做AllReduce操作的最上面入口处,nccl-test可以调用的API)
1
2
3struct ncclInfo info = { ncclFuncAllReduce, "AllReduce",
sendbuff, recvbuff, count, datatype, op, 0, comm, stream, /* Args */
ALLREDUCE_CHUNKSTEPS, ALLREDUCE_SLICESTEPS }; - 下面我将接着《NCCL代码阅读-nccltest篇》来分析,上面info里面的几个参数如果看过那篇都很熟悉
- 这边单独拿出taskAppend分析一下,这个task会被一直传递到下面的kernel中
- 分析写在注释里了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76// 把info转换成一个ncclTaskColl(task),把task加到comm->planner里
static ncclResult_t taskAppend(struct ncclComm* comm, struct ncclInfo* info) {
struct ncclKernelPlanner *planner = &comm->planner;
if (info->coll == ncclFuncSend || info->coll == ncclFuncRecv) {
// 我们只分析我们的例子,我们的coll是ncclFuncAllReduce
} else {
// Empty collectives can be discarded.
if (info->count == 0) return ncclSuccess;
// 将主机端的 info->op(操作符)转换成设备端格式,存入 opDev 结构体中,供后续计算使用
struct ncclDevRedOpFull opDev;
NCCLCHECK(hostToDevRedOp(&opDev, info->op, info->datatype, comm));
if (comm->nRanks == 1) {
//我们的例子中comm->nRanks是2
} else {
// Must be in thread local group before tasks can be alloc'd in `comm->memScoped`.
// 这里,我们的情况下,是每个进程的一个comm单独成一个group,ncclGroupCommHead就指向各自进程的那一个comm
// 这里的group,可以查看《NCCL代码阅读-02》
ncclGroupCommJoin(info->comm);
// 分配一个ncclTaskColl结构体
struct ncclTaskColl* t = ncclMemoryPoolAlloc<struct ncclTaskColl>(&comm->memPool_ncclTaskColl, &comm->memPermanent);
t->func = info->coll;
t->sendbuff = info->sendbuff;
t->recvbuff = info->recvbuff;
t->count = info->count;
t->root = info->root;
t->datatype = info->datatype;
size_t elementSize = ncclTypeSize(t->datatype);
if (t->func == ncclFuncAllGather || t->func == ncclFuncBroadcast) {
t->count *= elementSize;
t->datatype = ncclInt8;
elementSize = 1;
}
// 这边可以点进去看一下,AllReduce操作的ncclFuncTrafficPerByte是2,因为每个字节都要发出一次,收回一次
t->trafficBytes = t->count*elementSize*ncclFuncTrafficPerByte(t->func, comm->nRanks);
t->opHost = info->op;
t->opDev = opDev; // C++ struct assignment
t->chunkSteps = info->chunkSteps;
t->sliceSteps = info->sliceSteps;
//更新当前任务的总数 nTasksColl。
//将新任务按流量大小插入任务队列中,ncclTaskCollSorterInsert 根据任务的流量字节数排序。
planner->nTasksColl += 1;
ncclTaskCollSorterInsert(&planner->collSorter, t, t->trafficBytes);
}
}
if (info->stream != planner->streamRecent || planner->streams == nullptr) {
planner->streamRecent = info->stream;
struct ncclCudaStreamList* l = planner->streams;
while (true) {
if (l == nullptr) { // Got to the end, this must be a new stream.
struct ncclCudaGraph graph;
NCCLCHECK(ncclCudaGetCapturingGraph(&graph, info->stream));
if (planner->streams != nullptr && !ncclCudaGraphSame(planner->capturingGraph, graph)) {
WARN("Streams given to a communicator within a NCCL group must either be all uncaptured or all captured by the same graph.");
return ncclInvalidUsage;
}
planner->capturingGraph = graph; // C++ struct assignment
// Add stream to list
l = ncclMemoryStackAlloc<struct ncclCudaStreamList>(&comm->memScoped);
l->stream = info->stream;
l->next = planner->streams;
planner->streams = l;
break;
}
if (l->stream == info->stream)
break; // Already seen stream.
l = l->next;
}
}
return ncclSuccess;
} - 我们在这里暂停一下,总结一下。appendTask之后,我们的一个task(AllReduce,数据元素个数为count个)就被插入comm的planner里面了,所有的命令信息都在这里面。
- 从这里往后,都是以group为单位进行操作,我们这边只要记得ncclGroupCommHead就是指向group中第一个comm的指针即可,在我们的场景下,就指向每个进程那唯一的一个comm
ncclGroupEndInternal
1 | ncclResult_t ncclGroupEndInternal(ncclSimInfo_t* simInfo) { |
groupLaunch
groupLaunch
1 | /** |
- 接下来,就进入了doLaunches,带着的参数说白了就是comm
- 看groupLaunch之前,要先看一下ncclAsyncJob这个结构体
- 以及,ncclGroupJob这个结构体是继承自ncclAsyncJob的
ncclAsyncJob
1 | struct ncclAsyncJob { |
ncclPrepareTasks
- 还要再看一下ncclPrepareTasks函数,在groupLaunch里面被调用的,这里面:
- 确定了nTasksPerChannel
- 一系列操作(这边我没具体分析,后面如果有需要我会回来补充),将tasks从planner->collTaskQueue中取出来
- 下面出现了两个结构:ncclDevWorkColl devWork和ncclWorkList* workNode
- devWork 是一个包含集体通信任务的详细信息的结构体(ncclDevWorkColl)。
- workNode 是一个 ncclWorkList 结构体,它包含一个类型字段(workType)和一个大小字段(size),以及存储任务数据的区域
- workNode 包含了 devWork 作为它的数据部分
- 把task转换成了ncclDevWorkColl,然后加入workNode,最后加入了队列:
1
ncclIntruQueueEnqueue(&planner->collWorkQueue, workNode)
- 在我的场景中,两个comm只有第一次要初始化algo channel,所以needConnect各自只有一次是true
doLaunches
- 主要功能就是两层循环,外层遍历每个comm组,内层处理组里面每个comm
- 对每个comm做初始化:ncclLaunchPrepare
- 从 comm->planner.unlaunchedPlansHead 中获取下一个待执行的内核计划(ncclKernelPlan)。
- 在启动内核前,执行ncclLaunchKernelBefore_NoUncapturedCuda
- 执行ncclLaunchKernel来启动内核
- 启动内核之后,执行ncclLaunchKernelAfter_NoCuda。
ncclLaunchPrepare
- 又出来一个新的结构:ncclKernelPlan…….(怎么这么多结构啊
- 分配了一个plan结构
- 总之我们是集合操作,所以进入了scheduleCollTasksToPlan(comm, plan, &budget),下面有这个函数的基本内容。这个函数主要干了这些事:
- 划分了channel和chunk
- 把task从comm的planner队列中取出,加到plan->collTaskQueue里
- 在workNode后面带着的devWork里面存入划分出的chunk、channel等信息,最后把workNode从comm的planner队列中取出,加到plan->WorkQueue里
- 在plan里面存入channelMask等信息
- 调用finishPlan函数(下面有代码),这个函数主要做的是:
- 在plan里面分配一个kernelArgs结构,然后设置其comm, channelMask和workStorageType
- 下面把所有channel上的workBatchQueue里面的所有workBatch都按照一定顺序挂到kernelArgs后面去:
1
struct ncclDevWorkBatch* batchZero = (struct ncclDevWorkBatch*)(plan->kernelArgs+1);
- 然后把这个plan加入comm的planQueue
- 设置了流之间的依赖关系
scheduleCollTasksToPlan
1 | static ncclResult_t scheduleCollTasksToPlan( |
finishPlan
1 | static void finishPlan(struct ncclComm* comm, struct ncclKernelPlan* plan) { |
ncclLaunchKernelBefore_NoUncapturedCuda
- 调用函数uploadWork
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104static ncclResult_t uploadWork(struct ncclComm* comm, struct ncclKernelPlan* plan) {
size_t workBytes = plan->workBytes;
size_t batchBytes = plan->nWorkBatches*sizeof(struct ncclDevWorkBatch);
void* fifoBufHost;
uint32_t fifoCursor, fifoMask;
switch (plan->workStorageType) {
case ncclDevWorkStorageTypeArgs:
// 我们是这个情况
plan->kernelArgs->workBuf = nullptr;
fifoBufHost = (void*)plan->kernelArgs;
fifoCursor = sizeof(ncclDevKernelArgs) + batchBytes;
fifoMask = ~0u;
break;
case ncclDevWorkStorageTypeFifo:
// fifoBufHost = comm->workFifoBuf;
// fifoCursor = comm->workFifoProduced;
// fifoMask = comm->workFifoBytes-1;
// waitWorkFifoAvailable(comm, fifoCursor + workBytes);
// plan->kernelArgs->workBuf = comm->workFifoBufDev;
// break;
case ncclDevWorkStorageTypePersistent:
// static_assert(16 <= alignof(max_align_t), "We rely on 16-byte alignment.");
// fifoBufHost = malloc(workBytes);
// fifoCursor = 0;
// fifoMask = ~0u;
// break;
default:
return ncclInternalError;
}
plan->kernelArgs->workMask = fifoMask;
// Batches were placed after kernelArgs by finishPlan(). Only thing left to
// do is translate the work offset from zero based (in plan) to:
// ncclDevWorkStorageTypeArgs: offset from beginning of kernel args
// ncclDevWorkStorageTypeFifo: offset from base of fifo
// ncclDevWorkStorageTypePersistent: no translation since our dedicated buffer will also begin at zero.
struct ncclDevWorkBatch* batchZero = (struct ncclDevWorkBatch*)(plan->kernelArgs+1);
for (int b=0; b < plan->nWorkBatches; b++) {
batchZero[b].offsetBase += fifoCursor;
}
// 这边的FIFO队列是主机端的,直接指向同样放在主机内存的plan->kernelArgs,后面在启用内核函数的时候,指向kernelArgs的指针会被作为参数传到内核函数里,然后kernelArgs的内容会被复制到GPU的共享内存里
// Write the channel-shared work structs.
struct ncclWorkList* workNode = ncclIntruQueueHead(&plan->workQueue);
while (workNode != nullptr) {
char* dst = (char*)fifoBufHost;
char* src = (char*)(workNode+1);
for (int n = workNode->size; n != 0; n -= 16) {
memcpy(
__builtin_assume_aligned(dst + (fifoCursor & fifoMask), 16),
__builtin_assume_aligned(src, 16),
16
);
fifoCursor += 16;
src += 16;
}
workNode = workNode->next;
}
switch (plan->workStorageType) {
case ncclDevWorkStorageTypeFifo:
// comm->workFifoProduced = fifoCursor;
// if (comm->workFifoBufGdrHandle != nullptr) wc_store_fence();
// break;
case ncclDevWorkStorageTypePersistent:
// { ncclResult_t result = ncclSuccess;
// cudaStreamCaptureMode mode = cudaStreamCaptureModeRelaxed;
// void* fifoBufDev = nullptr;
// CUDACHECK(cudaThreadExchangeStreamCaptureMode(&mode));
// // Acquire deviceStream to gain access to deviceStream.cudaStream. Since the
// // user's graph will be launched later, and it also acquires the deviceStream,
// // it will observe this upload.
// NCCLCHECKGOTO(ncclStrongStreamAcquireUncaptured(&comm->sharedRes->deviceStream), result, finish_scope);
// CUDACHECKGOTO(cudaMallocAsync(&fifoBufDev, workBytes, comm->memPool, comm->sharedRes->deviceStream.cudaStream), result, finish_scope);
// plan->workBufPersistent = fifoBufDev;
// plan->kernelArgs->workBuf = fifoBufDev;
// CUDACHECKGOTO(cudaMemcpyAsync(fifoBufDev, fifoBufHost, workBytes, cudaMemcpyDefault, comm->sharedRes->deviceStream.cudaStream), result, finish_scope);
// cudaEvent_t memcpyDone;
// CUDACHECKGOTO(cudaEventCreateWithFlags(&memcpyDone, cudaEventDisableTiming), result, finish_scope);
// CUDACHECKGOTO(cudaEventRecord(memcpyDone, comm->sharedRes->deviceStream.cudaStream), result, finish_scope);
// struct uploadWork_cleanup_t* cleanup;
// NCCLCHECK(ncclCalloc(&cleanup, 1));
// cleanup->base.fn = uploadWork_cleanup_fn;
// cleanup->base.event = memcpyDone;
// cleanup->hostBuf = fifoBufHost;
// ncclIntruQueueEnqueue(&comm->eventCallbackQueue, &cleanup->base);
// NCCLCHECKGOTO(ncclStrongStreamRelease(ncclCudaGraphNone(), &comm->sharedRes->deviceStream), result, finish_scope);
// NCCLCHECKGOTO(ncclCommPollEventCallbacks(comm), result, finish_scope);
// finish_scope:
// CUDACHECK(cudaThreadExchangeStreamCaptureMode(&mode));
// if (result != ncclSuccess) return result;
// } break;
default: break;
}
return ncclSuccess;
}
ncclLaunchKernel
1 | ncclResult_t ncclLaunchKernel(struct ncclComm* comm, struct ncclKernelPlan* plan) { |
- 调用了cuLaunchKernel(Ex),交给GPU异步处理,CPU可以返回了
生成文件里的内容
1 |
|
- 按照《NCCL代码阅读-01》说的那样,接下来CUDA就开始查表,接着按照生成文件里的代码,进入了ncclKernelMain
- 注意是从cuLaunchKernel或者cuLaunchKernelEx进入的,在进入的时候,指定了grid和block的维度,后面就会分配多少线程块给他
- 以及,进入的时候,具体的操作内容都是放在extra里面传入的
1
2
3// 下面是参数实例化之后的调用情况
ncclKernelMain<240, RunWorkBatch<ncclFuncAllReduce, uint64_t, FuncSum<uint64_t>,
NCCL_ALGO_RING, NCCL_PROTO_LL>>(&args4K.args)
ncclKernelMain
- 解释都写在注释里面了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96template<int SpecializedFnId, typename SpecializedRunWorkBatch>
__device__ __forceinline__ void ncclKernelMain(struct ncclDevKernelArgs const* args) {
//////////////////////////////////////////////////////////////////
// SpecializedFnId=240
// SpecializedRunWorkBatch=RunWorkBatch<ncclFuncAllReduce, uint64_t, FuncSum<uint64_t>,
// NCCL_ALGO_RING, NCCL_PROTO_LL>>(&args4K.args)
//////////////////////////////////////////////////////////////////
// tid是该线程在线程块中的索引,关于CUDA的编程模型见《CUDA编程模型》章节
int tid = threadIdx.x;
// tn是该线程块一共有多少线程
int tn = blockDim.x;
// 把一些只读的kernel参数放到共享内存里,要是不显示放置的话,编译器会把这些参数放到线程自己的栈里面,很占地方
// 这里是召集一群thread,把参数拆一拆,每个进程搬一点,把他们搬到共享内存里,后面__syncthreads()就是搬完了
if (tid < sizeof(ncclDevKernelArgs)/sizeof(uint32_t)) {
((uint32_t*)&ncclShmem.args)[tid] = ((uint32_t*)args)[tid];
}
// args->channelMask是一个掩码,比如有64个通道,那就有64位掩码,第x位为1表示第x个通道启用了
// (1ull<<tid)表示把1左移tid位,这里先把tid看成一个数字即可,所以(1ull<<tid)就是只有第tid位为1
// 这个if条件表示,满足tid小于MAXCHANNELS(其实就是32)并且第tid位的通道是启用的,才进入if
// ((1ull<<tid)-1)表示让小于tid位的位全部置1,其余位都是0
// 比如原来(1ull<<tid)是0b0100,tid是2,那-1之后就是0b0011
// args->channelMask & ((1ull<<tid)-1)就是只保留小于tid的启用通道
// __popcll(x)会统计x的二进制下有几个1
// 所以__popcll(args->channelMask & ((1ull<<tid)-1))统计了小于tid的通道号的通道有几个启用的
// 最后如果n=线程块号,那就让这个线程块负责tid号channel
// 其实这种分配方式就是为了做到动态分配线程块所负责的通道,同一个线程块只有一个线程的tid号会被选中作为该线程块的通道号,因为比如2号前面启用的个数一定和1号前面启用的个数不同(在1号启用的条件下),所以不可能同时让n=块号
if (tid < MAXCHANNELS && (args->channelMask & (1ull<<tid))) {
int n = __popcll(args->channelMask & ((1ull<<tid)-1));
if (blockIdx.x == n) ncclShmem.channelId = tid;
}
__syncthreads(); // publish ncclShmem.{args, channelId}
/* set abort flag to 0 */
if (tid == 0) ncclShmem.aborted = 0;
// 用前两个warp来搬运comm和channel的控制信息,其余的warp用来搬运work batch
switch (tid/WARP_SIZE) {
case 0:
{ void* dst = &ncclShmem.comm;
void* src = ncclShmem.args.comm;
int bytes = sizeof(ncclDevComm);
static_assert(sizeof(ncclDevComm) <= 16*WARP_SIZE, "ncclDevComm cannot be loaded by a single warp in one insn.");
copyToShmem16(tid, dst, src, bytes);
} break;
case 1:
{ // Get address of channel without incurring indirect load from ncclDevComm::channels
void* dst = &ncclShmem.channel;
void* src = &((ncclDevCommAndChannels*)ncclShmem.args.comm)->channels[ncclShmem.channelId];
int bytes = sizeof(ncclDevChannel);
static_assert(sizeof(ncclDevChannel) <= 16*WARP_SIZE, "ncclDevChannel cannot be loaded by a single warp in one insn.");
copyToShmem16(tid-WARP_SIZE, dst, src, bytes);
} break;
default:
{ int subtid = tid - 2*WARP_SIZE;
int subtn = tn - 2*WARP_SIZE;
// Coverity reports a possible thread divergence due to not all threads participating in the collective.
// However, the code ensures that the participation is on a per-warp basis.
// coverity[device_thread_diverged:FALSE]
loadWorkBatchToShmem(subtid, subtn, args, /*batchIx=*/blockIdx.x);
} break;
}
__syncthreads(); // publish ncclShmem
// 我们的例子里面workStorageType是ncclDevWorkStorageTypeArgs
if (tid == 0 && ncclShmem.args.workStorageType == ncclDevWorkStorageTypeFifo) {
// ncclShmem.workConsumed written by loadWorkBatchToShmem before __syncthreads()
ncclShmem.comm.workConsumed[ncclShmem.channelId] = ncclShmem.workConsumed;
}
while (true) {
if (0 <= SpecializedFnId && ncclShmem.funcId == (unsigned)SpecializedFnId) {
// 实际执行,RunWorkBatch<ncclFuncAllReduce, uint64_t, FuncSum<uint64_t>, NCCL_ALGO_RING, NCCL_PROTO_LL>>(&args4K.args).run()
SpecializedRunWorkBatch().run();
} else {
ncclDevFuncTable[ncclShmem.funcId]();// 所有线程都会执行这一行指令
}
if (ncclShmem.nextBatchIx == -1) break;
int batchIx = ncclShmem.nextBatchIx;
__syncthreads();
loadWorkBatchToShmem(tid, tn, args, batchIx);
// Check whether the last operation was aborted and make sure all threads exit
bool aborted = false;
if (tid == 0) aborted = *ncclShmem.comm.abortFlag;
aborted = barrier_red_or_aligned(aborted, 0); // publish ncclShmem.work
if (tid == 0 && ncclShmem.args.workStorageType == ncclDevWorkStorageTypeFifo) {
// ncclShmem.workConsumed written by loadWorkBatchToShmem before barrier_red_or()
ncclShmem.comm.workConsumed[ncclShmem.channelId] = ncclShmem.workConsumed;
}
if (aborted) break;
}
} - 可以看到,核心执行函数就是SpecializedRunWorkBatch().run()(上下两个一个意思,去01里提到的表里找一下即可)
RunWorkBatch
1 | template<ncclFunc_t Fn, typename T, typename RedOp, int Algo, int Proto> |
预知后事如何,参见NCCL代码阅读-06