MPI中无用的延迟无阻塞消息传递:针对那些“有点儿”的人的轻便分析和教程

最近,我不得不从老师那里解决另一个琐碎的培训任务。 但是,解决了这个问题后,我设法吸引了我以前从未想过的事情的注意,也许您也没有考虑过。 本文将对学生和使用MPI进入并行编程世界的每个人都更有帮助。



我们的“给出:”


因此,我们本质上计算任务的本质是比较使用无阻塞,延迟的点对点传输的程序比使用阻塞的点对点传输的程序快多少倍。 我们将对尺寸为64、256、1024、4096、8192、16384、65536、262144、1048576、4194304、16777216和33554432元素的输入数组进行测量。 默认情况下,建议通过四个过程来解决它。 实际上,这就是我们要考虑的内容:



在输出中,我们应该获得三个向量:Y1,Y2和Y3,这将归零。 我将在基于具有16 GB RAM 的Intel处理器的系统上测试整个过程。 为了开发程序,我们将使用Microsoft版本9.0.1 (在撰写本文时,是相关的),Visual Studio Community 2017而非Fortran的MPI标准的实现。

物资


我不想详细描述将使用的MPI函数如何工作,您可以随时查看它的文档 ,因此,我仅简要概述我们将使用的功能。

阻止交换


为了阻止点对点消息传递,我们将使用以下功能:

MPI_Send-实现阻止消息发送,即 调用该函数后,该过程将被阻塞,直到发送给它的数据从其内存写入MPI内部系统缓冲区为止,此后,该过程将继续工作。
MPI_Recv-执行阻止消息接收,即 调用该函数后,该过程将被阻塞,直到来自发送过程的数据到达为止,直到MPI环境将该数据完全写入接收过程的缓冲区为止。

延迟非阻塞交换


对于延迟的非阻塞点对点消息传递,我们将使用以下功能:

MPI_Send_init-在后台为发送数据而准备环境,该数据将来会发生并且没有锁;
MPI_Recv_init-此功能与上一个功能相似,仅这次是接收数据;
MPI_Start-启动接收或发送消息的过程,它也在后台运行。 没有阻碍
MPI_Wait-用于检查,并在必要时等待发送或接收消息的完成,但是它仅在必要时阻止该过程(如果数据是“未发送”或“未接收”)。 例如,一个进程想要使用尚未到达的数据-不好,因此我们在需要此数据的位置之前插入MPI_Wait(即使有数据损坏的危险,我们也会插入它)。 另一个示例,该过程开始进行后台数据传输,并且在开始数据传输之后,它立即开始以某种方式更改此数据-不好,因此我们将MPI_Wait插入程序中开始更改此数据的位置的前面(即使在此情况下,我们也会插入它,仅存在数据损坏的风险)。

因此,从语义上讲,具有无阻塞延迟交换的呼叫序列如下:

  1. MPI_Send_init / MPI_Recv_init-准备用于接收或发送的环境
  2. MPI_Start-开始接收/发送过程
  3. MPI_Wait-我们认为传输或接收的数据有损坏的风险(包括“不足”和“不足”报告)

我还在测试程序中使用了MPI_StartallMPI_Waitall ,它们的含义基本上分别与MPI_Start和MPI_Wait相同,只是它们在多个程序包和/或传输上运行。 但这不是启动和等待功能的完整列表,还有更多用于检查操作完整性的功能。

进程间架构


为了清楚起见,我们构造了一个图形,用于通过四个过程执行计算。 在这种情况下,应该尝试将所有向量算术运算相对平均地分布在各个进程中。 这是我得到的:



看到这些阵列T0-T2吗? 这些是用于存储中间操作结果的缓冲区。 同样,在该图上,当从一个进程向另一个进程发送消息时,箭头的开头是要传输数据的数组的名称,箭头的末尾是接收此数据的数组。

好吧,我们什么时候终于回答了这些问题:

  1. 我们正在解决什么样的问题?
  2. 我们将使用什么工具来解决它?
  3. 我们将如何解决?

它仍然只是解决它...

我们的“解决方案:”


接下来,我将介绍上面讨论的两个程序的代码,但首先,我将给出一些关于什么以及如何做的解释。

为了增加代码的可读性,我在单独的过程(add,sub,mul,div)中取出了所有矢量算术运算。 所有输入数组均根据我几乎随机指示的公式进行初始化。 由于零进程从所有其他进程收集工作结果,因此它的工作时间最长,因此在第一种情况和第二种情况下将其工作时间与程序的运行时间相等是合乎逻辑的(我们记得,我们感兴趣的是:算术+消息传递)。 我们将使用MPI_Wtime函数测量时间间隔同时我决定使用MPI_Wtick显示我在那里的手表的分辨率(在我灵魂深处的某个地方,我希望它们适合我的不变TSC,在这种情况下,我什至愿意原谅他们的错误与该函数被称为MPI_Wtime的时间相关联。 因此,我们将上面我写的所有内容放在一起,并根据该图最终开发这些程序(当然也要调试)。



谁在乎看代码:

阻止数据传输的程序
#include "pch.h" #include <iostream> #include <iomanip> #include <fstream> #include <mpi.h> using namespace std; void add(double *A, double *B, double *C, int n); void sub(double *A, double *B, double *C, int n); void mul(double *A, double *B, double *C, int n); void div(double *A, double *B, double *C, int n); int main(int argc, char **argv) { if (argc < 2) { return 1; } int n = atoi(argv[1]); int rank; double start_time, end_time; MPI_Status status; double *A = new double[n]; double *B = new double[n]; double *C = new double[n]; double *D = new double[n]; double *E = new double[n]; double *G = new double[n]; double *T0 = new double[n]; double *T1 = new double[n]; double *T2 = new double[n]; for (int i = 0; i < n; i++) { A[i] = double (2 * i + 1); B[i] = double(2 * i); C[i] = double(0.003 * (i + 1)); D[i] = A[i] * 0.001; E[i] = B[i]; G[i] = C[i]; } cout.setf(ios::fixed); cout << fixed << setprecision(9); MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); if (rank == 0) { start_time = MPI_Wtime(); sub(A, B, T0, n); MPI_Send(T0, n, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD); MPI_Send(T0, n, MPI_DOUBLE, 2, 0, MPI_COMM_WORLD); div(T0, G, T1, n); MPI_Recv(T2, n, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD, &status); add(T1, T2, T0, n); mul(T0, T1, T2, n); MPI_Recv(T0, n, MPI_DOUBLE, 2, 0, MPI_COMM_WORLD, &status); MPI_Send(T2, n, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD); add(T0, T2, T1, n); MPI_Recv(T0, n, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD, &status); MPI_Recv(T2, n, MPI_DOUBLE, 2, 0, MPI_COMM_WORLD, &status); end_time = MPI_Wtime(); cout << "Clock resolution: " << MPI_Wtick() << " secs" << endl; cout << "Thread " << rank << " execution time: " << end_time - start_time << endl; } if (rank == 1) { add(C, C, T0, n); MPI_Recv(T1, n, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, &status); MPI_Send(T0, n, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD); mul(T1, G, T2, n); add(T2, C, T0, n); MPI_Recv(T1, n, MPI_DOUBLE, 2, 0, MPI_COMM_WORLD, &status); MPI_Send(T0, n, MPI_DOUBLE, 2, 0, MPI_COMM_WORLD); sub(T1, T0, T2, n); MPI_Recv(T0, n, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, &status); add(T0, T2, T1, n); MPI_Send(T1, n, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD); } if (rank == 2) { mul(C, C, T0, n); MPI_Recv(T1, n, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, &status); MPI_Recv(T2, n, MPI_DOUBLE, 3, 0, MPI_COMM_WORLD, &status); MPI_Send(T0, n, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD); MPI_Send(T0, n, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD); add(T1, T2, T0, n); mul(T0, G, T1, n); MPI_Recv(T2, n, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD, &status); mul(T1, T2, T0, n); MPI_Recv(T1, n, MPI_DOUBLE, 3, 0, MPI_COMM_WORLD, &status); mul(T0, T1, T2, n); MPI_Send(T2, n, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD); } if (rank == 3) { mul(E, D, T0, n); MPI_Send(T0, n, MPI_DOUBLE, 2, 0, MPI_COMM_WORLD); sub(T0, B, T1, n); mul(T1, T1, T2, n); sub(T1, G, T0, n); mul(T0, T2, T1, n); MPI_Send(T1, n, MPI_DOUBLE, 2, 0, MPI_COMM_WORLD); } MPI_Finalize(); delete[] A; delete[] B; delete[] C; delete[] D; delete[] E; delete[] G; delete[] T0; delete[] T1; delete[] T2; return 0; } void add(double *A, double *B, double *C, int n) { for (size_t i = 0; i < n; i++) { C[i] = A[i] + B[i]; } } void sub(double *A, double *B, double *C, int n) { for (size_t i = 0; i < n; i++) { C[i] = A[i] - B[i]; } } void mul(double *A, double *B, double *C, int n) { for (size_t i = 0; i < n; i++) { C[i] = A[i] * B[i]; } } void div(double *A, double *B, double *C, int n) { for (size_t i = 0; i < n; i++) { C[i] = A[i] / B[i]; } } 

延迟无阻塞数据传输的程序
 #include "pch.h" #include <iostream> #include <iomanip> #include <fstream> #include <mpi.h> using namespace std; void add(double *A, double *B, double *C, int n); void sub(double *A, double *B, double *C, int n); void mul(double *A, double *B, double *C, int n); void div(double *A, double *B, double *C, int n); int main(int argc, char **argv) { if (argc < 2) { return 1; } int n = atoi(argv[1]); int rank; double start_time, end_time; MPI_Request request[7]; MPI_Status statuses[4]; double *A = new double[n]; double *B = new double[n]; double *C = new double[n]; double *D = new double[n]; double *E = new double[n]; double *G = new double[n]; double *T0 = new double[n]; double *T1 = new double[n]; double *T2 = new double[n]; for (int i = 0; i < n; i++) { A[i] = double(2 * i + 1); B[i] = double(2 * i); C[i] = double(0.003 * (i + 1)); D[i] = A[i] * 0.001; E[i] = B[i]; G[i] = C[i]; } cout.setf(ios::fixed); cout << fixed << setprecision(9); MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); if (rank == 0) { start_time = MPI_Wtime(); MPI_Send_init(T0, n, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD, &request[0]);// MPI_Send_init(T0, n, MPI_DOUBLE, 2, 0, MPI_COMM_WORLD, &request[1]);// MPI_Recv_init(T2, n, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD, &request[2]);// MPI_Recv_init(T0, n, MPI_DOUBLE, 2, 0, MPI_COMM_WORLD, &request[3]);// MPI_Send_init(T2, n, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD, &request[4]);// MPI_Recv_init(T0, n, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD, &request[5]);// MPI_Recv_init(T2, n, MPI_DOUBLE, 2, 0, MPI_COMM_WORLD, &request[6]);// MPI_Start(&request[2]); sub(A, B, T0, n); MPI_Startall(2, &request[0]); div(T0, G, T1, n); MPI_Waitall(3, &request[0], statuses); add(T1, T2, T0, n); mul(T0, T1, T2, n); MPI_Startall(2, &request[3]); MPI_Wait(&request[3], &statuses[0]); add(T0, T2, T1, n); MPI_Startall(2, &request[5]); MPI_Wait(&request[4], &statuses[0]); MPI_Waitall(2, &request[5], statuses); end_time = MPI_Wtime(); cout << "Clock resolution: " << MPI_Wtick() << " secs" << endl; cout << "Thread " << rank << " execution time: " << end_time - start_time << endl; } if (rank == 1) { MPI_Recv_init(T1, n, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, &request[0]);// MPI_Send_init(T0, n, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, &request[1]);// MPI_Recv_init(T1, n, MPI_DOUBLE, 2, 0, MPI_COMM_WORLD, &request[2]);// MPI_Send_init(T0, n, MPI_DOUBLE, 2, 0, MPI_COMM_WORLD, &request[3]);// MPI_Recv_init(T0, n, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, &request[4]);// MPI_Send_init(T1, n, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, &request[5]);// MPI_Start(&request[0]); add(C, C, T0, n); MPI_Start(&request[1]); MPI_Wait(&request[0], &statuses[0]); mul(T1, G, T2, n); MPI_Start(&request[2]); MPI_Wait(&request[1], &statuses[0]); add(T2, C, T0, n); MPI_Start(&request[3]); MPI_Wait(&request[2], &statuses[0]); sub(T1, T0, T2, n); MPI_Wait(&request[3], &statuses[0]); MPI_Start(&request[4]); MPI_Wait(&request[4], &statuses[0]); add(T0, T2, T1, n); MPI_Start(&request[5]); MPI_Wait(&request[5], &statuses[0]); } if (rank == 2) { MPI_Recv_init(T1, n, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, &request[0]);// MPI_Recv_init(T2, n, MPI_DOUBLE, 3, 0, MPI_COMM_WORLD, &request[1]);// MPI_Send_init(T0, n, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD, &request[2]);// MPI_Send_init(T0, n, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, &request[3]);// MPI_Recv_init(T2, n, MPI_DOUBLE, 1, 0, MPI_COMM_WORLD, &request[4]);// MPI_Recv_init(T1, n, MPI_DOUBLE, 3, 0, MPI_COMM_WORLD, &request[5]);// MPI_Send_init(T2, n, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, &request[6]);// MPI_Startall(2, &request[0]); mul(C, C, T0, n); MPI_Startall(2, &request[2]); MPI_Waitall(4, &request[0], statuses); add(T1, T2, T0, n); MPI_Start(&request[4]); mul(T0, G, T1, n); MPI_Wait(&request[4], &statuses[0]); mul(T1, T2, T0, n); MPI_Start(&request[5]); MPI_Wait(&request[5], &statuses[0]); mul(T0, T1, T2, n); MPI_Start(&request[6]); MPI_Wait(&request[6], &statuses[0]); } if (rank == 3) { MPI_Send_init(T0, n, MPI_DOUBLE, 2, 0, MPI_COMM_WORLD, &request[0]); MPI_Send_init(T1, n, MPI_DOUBLE, 2, 0, MPI_COMM_WORLD, &request[1]); mul(E, D, T0, n); MPI_Start(&request[0]); sub(T0, B, T1, n); mul(T1, T1, T2, n); MPI_Wait(&request[0], &statuses[0]); sub(T1, G, T0, n); mul(T0, T2, T1, n); MPI_Start(&request[1]); MPI_Wait(&request[1], &statuses[0]); } MPI_Finalize(); delete[] A; delete[] B; delete[] C; delete[] D; delete[] E; delete[] G; delete[] T0; delete[] T1; delete[] T2; return 0; } void add(double *A, double *B, double *C, int n) { for (size_t i = 0; i < n; i++) { C[i] = A[i] + B[i]; } } void sub(double *A, double *B, double *C, int n) { for (size_t i = 0; i < n; i++) { C[i] = A[i] - B[i]; } } void mul(double *A, double *B, double *C, int n) { for (size_t i = 0; i < n; i++) { C[i] = A[i] * B[i]; } } void div(double *A, double *B, double *C, int n) { for (size_t i = 0; i < n; i++) { C[i] = A[i] / B[i]; } } 



测试与分析


让我们为不同大小的数组运行程序,看看会发生什么。 表中总结了测试结果,我们在表的最后一栏中计算并编写了加速度系数,定义如下:K accele = T ex。 无障碍。 / T 块。



如果您比平时更仔细地查看此表,您会注意到,随着处理元素数量的增加,加速度系数会以某种方式降低:



让我们尝试确定是怎么回事? 为此,我建议编写一个小型测试程序,该程序将测量每个矢量算术运算的时间,并仔细将结果缩小为普通的文本文件。



实际上,这里是程序本身:

时间测量
 #include "pch.h" #include <iostream> #include <iomanip> #include <Windows.h> #include <fstream> using namespace std; void add(double *A, double *B, double *C, int n); void sub(double *A, double *B, double *C, int n); void mul(double *A, double *B, double *C, int n); void div(double *A, double *B, double *C, int n); int main() { struct res { double add; double sub; double mul; double div; }; int i, j, k, n, loop; LARGE_INTEGER start_time, end_time, freq; ofstream fout("test_measuring.txt"); int N[12] = { 64, 256, 1024, 4096, 8192, 16384, 65536, 262144, 1048576, 4194304, 16777216, 33554432 }; SetConsoleOutputCP(1251); cout << "   loop: "; cin >> loop; fout << setiosflags(ios::fixed) << setiosflags(ios::right) << setprecision(9); fout << " : " << loop << endl; fout << setw(10) << "\n " << setw(30) << ".   (c)" << setw(30) << ".   (c)" << setw(30) << ".  (c)" << setw(30) << ".   (c)" << endl; QueryPerformanceFrequency(&freq); cout << "\n : " << freq.QuadPart << " " << endl; for (k = 0; k < sizeof(N) / sizeof(int); k++) { res output = {}; n = N[k]; double *A = new double[n]; double *B = new double[n]; double *C = new double[n]; for (i = 0; i < n; i++) { A[i] = 2.0 * i; B[i] = 2.0 * i + 1; C[i] = 0; } for (j = 0; j < loop; j++) { QueryPerformanceCounter(&start_time); add(A, B, C, n); QueryPerformanceCounter(&end_time); output.add += double(end_time.QuadPart - start_time.QuadPart) / double(freq.QuadPart); QueryPerformanceCounter(&start_time); sub(A, B, C, n); QueryPerformanceCounter(&end_time); output.sub += double(end_time.QuadPart - start_time.QuadPart) / double(freq.QuadPart); QueryPerformanceCounter(&start_time); mul(A, B, C, n); QueryPerformanceCounter(&end_time); output.mul += double(end_time.QuadPart - start_time.QuadPart) / double(freq.QuadPart); QueryPerformanceCounter(&start_time); div(A, B, C, n); QueryPerformanceCounter(&end_time); output.div += double(end_time.QuadPart - start_time.QuadPart) / double(freq.QuadPart); } fout << setw(10) << n << setw(30) << output.add / loop << setw(30) << output.sub / loop << setw(30) << output.mul / loop << setw(30) << output.div / loop << endl; delete[] A; delete[] B; delete[] C; } fout.close(); cout << endl; system("pause"); return 0; } void add(double *A, double *B, double *C, int n) { for (size_t i = 0; i < n; i++) { C[i] = A[i] + B[i]; } } void sub(double *A, double *B, double *C, int n) { for (size_t i = 0; i < n; i++) { C[i] = A[i] - B[i]; } } void mul(double *A, double *B, double *C, int n) { for (size_t i = 0; i < n; i++) { C[i] = A[i] * B[i]; } } void div(double *A, double *B, double *C, int n) { for (size_t i = 0; i < n; i++) { C[i] = A[i] / B[i]; } } 



在启动时,它要求您输入我测试了10,000个周期的测量周期数。 在输出中,我们获得每个操作的平均结果:



为了测量时间,我使用了高级QueryPerformanceCounter 。 我强烈建议您阅读此常见问题解答,以便大多数有关使用此功能测量时间的问题会自行消失。 根据我的观察,它依附于TSC(但理论上可能并非如此),但根据帮助返回了计数器的当前滴答数。 但是事实是我的计数器实际上无法测量32 ns的时间间隔(请参见结果表的第一行)。 此结果是由于在两次QueryPerformanceCounter调用之间传递了0个滴答声或1个滴答声,对于表的第一行,我们只能得出结论,即10,000个结果中约有三分之一等于1个滴答声。 因此,此表中用于64、256甚至1024个元素的数据非常近似。 现在,让我们打开任何程序,并计算每种类型遇到的总操作数,传统上,我们将根据下表“扩展”所有内容:



最后,我们知道每个向量算术运算的时间以及它在程序中的多少,尝试找出在并行程序中花在这些运算上的时间是多少,以及在进程之间阻塞和延迟非阻塞数据交换上所花费的时间,为清楚起见,我们将其减少为表:



根据获得的数据结果,我们构建了三个函数的图形:第一个描述从数组元素的数量开始的进程之间阻塞传输所花费的时间变化,第二个描述了数组之间的进程之间延迟非阻塞传输所花费的时间变化,数组元素的数量以及第三个描述时间的变化,从数组的元素数上花费在算术运算上:



正如您已经注意到的,图形的垂直比例是对数的,这是必要的度量,因为 时间分散性太大,在常规图表上什么也看不到。 注意依赖于算术时间的函数依赖于元素数量,它可以安全地超过其他两个函数约一百万个元素。 问题是它比两个对手更快地增长到无限。 因此,随着处理元素数量的增加,程序的运行时间越来越多地由算术而不是传输来确定。 假设您增加了进程之间的传输次数,从概念上讲,您只会看到算术函数超过其他两个的那一刻将在以后发生。

总结


因此,继续增加数组的长度,您将得出结论,具有延迟的非阻塞传输的程序将仅比使用阻塞交换的程序快一点。 而且,如果将数组的长度指定为无穷大(或者,或者仅取很长的数组),那么程序的运行时间将由计算确定为100%,并且加速度系数将安全地趋于1。

Source: https://habr.com/ru/post/zh-CN427219/


All Articles