代码之家  ›  专栏  ›  技术社区  ›  Terra Omega

Pthreads:我的并行代码在一定数量后不会将线程传递到函数中

  •  2
  • Terra Omega  · 技术社区  · 6 年前

    我目前正在使用MPI创建一个并行化代码,以找出任何给定图形中有多少个三角形。到目前为止,我的代码能够成功地获得正确数量的三角形(我知道,因为我有相同代码的序列化版本,运行速度要慢得多),直到某一点。我想说,在大约6000个节点之后,我的线程不再在函数中被读取(我通过查看主函数中的线程,而不是计算线程是否进入函数中,发现了这一点)。

    作为参考,代码本身包含许多节点、边、种子和度。

    为此,我们将忽略种子和程度,因为当一切正常时,它们工作得非常好。

    编辑:我将快速解释那些丢失的变量的命名约定。本质上,我们得到了一个由多个节点以及连接它们的边组成的图(想想邻接列表)。现在,程序的任务是遍历每个顶点u,并在图中取另一个顶点v,以确定它们之间是否有相应的顶点w。在这种情况下,由于C中没有布尔值,我们将对uv、uw和vw边使用ints。这样,如果有一个连接边,那么我们可以将其转换为1。如果它们都是1,那么我们找到了一个三角形,现在可以将其添加到全局变量中。让我们知道,在for循环中计算三角形的代码是百分之百正确的,而不是问题所在。相反,这个问题涉及到在更高节点上使用pthread的问题。

    下面是代码:

    #include <stdio.h>
    #include <stdlib.h>
    #include <math.h>
    #include <time.h>
    
    #include "graph.h"
    
    #define MAX_N       1000000
    #define MAX_E       2*MAX_N // must be at least twice MAX_N
    
    GRAPH_t * G;    
    #define MAX_THREADS     65536
    #include <pthread.h>
    
    
    int thread_id[MAX_THREADS]; // User defined id for thread
    pthread_t p_threads[MAX_THREADS];// Threads
    pthread_attr_t attr;        // Thread attributes 
    
    pthread_mutex_t lock_count; // Protects minimum, count
    
    unsigned int parallelCount = 0;
    unsigned int threadCount = 0;
    void *count_ParallelTriangles(void *threadID) {
    
      int u = *((int *)threadID);
      int counter = 0;
    
      unsigned int v, w, e, uv, uw, vw;
      for (v = u + 1; v < G->n; v++) {
        uv = 0;
        for (e = G->V_ptr[v]; e < G->V_ptr[v + 1]; e++) {
            if (G->E_v[e] == u) {
                uv = 1;         // Edge (u,v) exists
            }
        }
        if (uv == 1) {
            for (w = v + 1; w < G->n; w++) {
                uw = 0; vw = 0;
                for (e = G->V_ptr[w]; e < G->V_ptr[w + 1]; e++) {
                    if (G->E_v[e] == u) uw = 1;     // Edge (u,w) exists
                    if (G->E_v[e] == v) vw = 1;     // Edge (v,w) exists
                }
                if ((uv == 1) && (vw == 1) && (uw == 1)) {
                    counter += 1;
                }
            }
        }
      }
      //if (counter > 0) {
        pthread_mutex_lock(&lock_count);
        threadCount += 1;
        parallelCount += counter;
        pthread_mutex_unlock(&lock_count);
      //}
    
      pthread_exit(NULL);
    }
    

    下面是主要的调用位置

    int main(int argc, char *argv[]) {
    
    struct timespec start, stop;
    float time_serial;
    
    unsigned int num_nodes, num_edges, seed, num_triangles, max_degree;
    
    if (argc != 5) {
      printf("Use: <executable_name> <num_nodes> <num_edges> <seed> 
      <max_degree>\n"); 
      exit(0);
    }
    if ((num_nodes = atoi(argv[argc-4])) > MAX_N) {
      printf("Maximum number of nodes allowed: %u\n", MAX_N);
      exit(0);
    }; 
    if ((num_edges = atoi(argv[argc-3])) > MAX_E) {
      printf("Maximum number of edges allowed: %u\n", MAX_E);
      exit(0);
    }; 
    if (num_edges < 2*num_nodes) {
      num_edges = 2*num_nodes;
      printf("Number of edges must be at least twice the number of nodes: changing 
      num_edges to %u\n", num_edges);
      exit(0);
    }; 
    seed = atoi(argv[argc-2]);
    max_degree = atoi(argv[argc-1]);
    
    // Initialize graph
    G = init_graph ( num_nodes, num_edges, seed, max_degree );
    
    float time_parallel;
    
    //Initialize Pthread
    pthread_mutex_init(&lock_count, NULL);
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
    
    clock_gettime(CLOCK_REALTIME, &start);
    for (int u = 0; u < num_nodes; u++) {
        thread_id[u] = u;
        pthread_create(&p_threads[u], &attr, count_ParallelTriangles, (void 
      *)&thread_id[u]);
      }
      for (int u = 0; u < num_nodes; u++) {
        pthread_join(p_threads[u], NULL);
      }
    
      clock_gettime(CLOCK_REALTIME, &stop);
    
      time_parallel = (stop.tv_sec - start.tv_sec)
        + 0.000000001*(stop.tv_nsec - start.tv_nsec);
      printf("Thread active: %d\n", threadCount);
      printf("Parallel execution time = %f s\n", time_parallel);
      // Print results
      printf("G: Nodes = %u, Edges = %u, Triangles = %u\n", G->n, G->m, parallelCount);
    
      pthread_attr_destroy(&attr);
      pthread_mutex_destroy(&lock_count);
    
      return 0;
    }
    

    最后,这是正确运行时的输出。将边设置为最大1000000,以便计算其平均度数(在本例中为234110)内可容纳的最大边数。

    ./triangles.exe 4096 1000000 0 0
    Serial execution time = 16.181034 s
    G: Nodes = 4096, Edges = 234110, Triangles = 651015
    Thread active: 4096
    Parallel execution time = 0.843587 s
    G: Nodes = 4096, Edges = 234110, Triangles = 651015
    

    我们可以看到,由于线程数量等于声明的节点数量,因此上述方法工作正常。然而,如果我们只增加几千个节点,我们会发现输出不再正常运行,尽管速度仍然很快:

    ./triangles.exe 6000 1000000 0 0
    Serial execution time = 48.326824 s
    G: Nodes = 6000, Edges = 413845, Triangles = 1207058
    Thread active: 2061
    Parallel execution time = 1.471421 s
    G: Nodes = 6000, Edges = 413845, Triangles = 1079834
    

    在上面的例子中,如果我们再运行这个示例几次,那么每次调用之间的线程数量都会发生变化,并且它计算的三角形也会随之发生变化(因为每个三角形计数都取决于将其正确传递给全局变量的线程)。序列化的计数和时间将保持相对一致,因为它已经是正确的。

    编辑: 为主文件添加了更多代码。

    下面是用于创建图形的头文件

    typedef struct _graph {
      unsigned int n;       // Number of vertices in the graph
      unsigned int m;       // Number of edges in the graph
      unsigned int * E_u;       // Edge i = (E_u[i],E_v[i]) 
      unsigned int * E_v;       // 
      unsigned int * V_ptr; // Edges incident on vertex u 
      // have ids V_ptr[u] ... V_ptr[u+1]-1
    } GRAPH_t; 
    
    GRAPH_t * init_graph ( unsigned int n, unsigned int m, unsigned int seed, 
    unsigned int max_degree ) {
    GRAPH_t * G = (GRAPH_t *) calloc(1, sizeof(GRAPH_t)); 
    unsigned u, v, e, nbrs, first, lastplus1, maxvalue;
    double fraction;
    G->n = n;
    G->E_u = (unsigned int *) calloc(m, sizeof(unsigned int)); 
    G->E_v = (unsigned int *) calloc(m, sizeof(unsigned int)); 
    G->V_ptr = (unsigned int *) calloc((G->n+1), sizeof(unsigned int)); 
    
    srand48(seed); 
    unsigned int count = 0; 
    // Generate edges 
    G->V_ptr[0] = count;
    
    for (u = 1; u < G->n; u++) {
    
    G->V_ptr[u] = count;
    
    switch (max_degree) {
        case 0:         // max_degree = 0 => max_degree = sqrt(n)
        nbrs = sqrt(G->n); if (nbrs > u) nbrs = u; 
        break;
        default:
        nbrs = max_degree; if (nbrs > u) nbrs = u;
        break;
     }
    
     first = G->V_ptr[u]; 
     lastplus1 = first + nbrs; if (lastplus1 > m) lastplus1 = m;
    
     if (first < lastplus1) {
    
       for (e = first; e < lastplus1; e++) 
         G->E_v[e] = ((unsigned int) lrand48()) % G->n;
    
         maxvalue = G->E_v[first]; 
         for (e = first+1; e < lastplus1; e++) {
           G->E_v[e] += G->E_v[e-1];
           maxvalue = G->E_v[e];
         }
    
         for (e = first; e < lastplus1; e++) {
           fraction = ((double) G->E_v[e])/(maxvalue+1+(lrand48()%G->n));
           G->E_v[e] = (unsigned int) (fraction * u); 
         }
    
         // Generate edges incident at u 
         G->E_u[count] = u; 
         G->E_v[count] = G->E_v[count]; 
         count++;
         for (e = first+1; e < lastplus1; e++) {
          if (G->E_v[count-1] < G->E_v[e]) {
             G->E_u[count] = u; 
             G->E_v[count] = G->E_v[e]; 
             count++;
           }
         }
       }
     }
     G->V_ptr[n] = count;
     G->m = count-1;        // Initialize number of edges
    
     // Check graph
     for (u = 0; u < G->n; u++) {
       if (G->V_ptr[u] > G->V_ptr[u+1]) {
          printf("Graph generation problem - 1!!!\n"); 
          exit(0); 
       }
       for (e = G->V_ptr[u]; e < G->V_ptr[u+1]; e++) {
          if (G->E_u[e] != u) {
            printf("Graph generation problem - 2!!!\n"); 
            exit(0); 
          }
          if (G->E_v[e] >= u) {
            printf("Graph generation problem - 3!!!\n"); 
            exit(0); 
          }
          if ((e > G->V_ptr[u]) && (G->E_v[e] <= G->E_v[e-1])) {
            printf("Graph generation problem - 4!!!\n"); 
            exit(0); 
          }
        }
      }
      return G;
    }
    
    1 回复  |  直到 6 年前
        1
  •  2
  •   Terra Omega    6 年前

    我愚蠢地意识到,我忘记了超级计算机在命令行上执行程序时有线程限制,如果在“专用模式”下使用批处理文件执行,则可以超过大约4096个线程。在使用批处理文件进行测试之后,我意识到了我的方法中的错误,这确实是我的解决方案。抱歉给您带来不便,亚尔!希望这些信息能帮助将来的其他用户检查您的超级计算机在多线程计算方面的策略!感谢Giles让我检查错误代码,因为如果没有错误代码告诉我4096的线程“用完了”(尽管有65536个,哈哈)。一旦我能回答,我将结束这个问题。