代码之家  ›  专栏  ›  技术社区  ›  Thomas

如何加速我的稀疏矩阵求解?

  •  17
  • Thomas  · 技术社区  · 14 年前

    我正在使用高斯-赛德尔方法编写稀疏矩阵解算器。通过分析,我已经确定我的程序大约一半的时间都花在求解器中。性能关键部分如下:

    size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    for (size_t y = 1; y < d_ny - 1; ++y) {
        for (size_t x = 1; x < d_nx - 1; ++x) {
            d_x[ic] = d_b[ic]
                    - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                    - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];
            ++ic; ++iw; ++ie; ++is; ++in;
        }
        ic += 2; iw += 2; ie += 2; is += 2; in += 2;
    }
    

    所有涉及的数组都是 float 类型。实际上,它们不是数组,而是具有 [] 运算符,我认为应该进行优化,但定义如下:

    inline float &operator[](size_t i) { return d_cells[i]; }
    inline float const &operator[](size_t i) const { return d_cells[i]; }
    

    为了 d_nx = d_ny = 128 ,在Intel i7 920上每秒可运行约3500次。这意味着内环体每秒运行3500*128*128=5700万次。由于只涉及一些简单的算术运算,对于2.66GHz处理器来说,这是一个很低的数字。

    也许它不受CPU功率的限制,而是受内存带宽的限制?好吧,一个128*128 浮动 阵列占用65KB,因此所有6个阵列都应该很容易地放入CPU的L3缓存(即8MB)。假设寄存器中没有缓存任何内容,我在内部循环体中计算15个内存访问。在64位系统上,每次迭代都有120字节,所以5700万*120字节=6.8GB/s。L3缓存以2.66GHz运行,所以它的数量级是相同的。我想记忆确实是瓶颈。

    为了加快速度,我尝试了以下方法:

    • 编译时使用 g++ -O3 . (好吧,我从一开始就这么做了。)

    • 使用openmp pragma并行处理4个内核。我必须改为Jacobi算法,以避免读取和写入同一数组。这需要我做两倍的迭代,从而得到大约相同速度的净结果。

    • 处理循环体的实现细节,例如使用指针而不是索引。没有效果。

    什么是最好的方法来加速这个家伙?是否有助于在组装过程中重写内体(我必须先学习这一点)?我应该在GPU上运行它吗(我知道怎么做,但这很麻烦)?还有什么好主意吗?

    (注意事项) 以“否”作为答案,例如:“它不能做得更快,因为……”

    更新 :根据要求,这里有一个完整的程序:

    #include <iostream>
    #include <cstdlib>
    #include <cstring>
    
    using namespace std;
    
    size_t d_nx = 128, d_ny = 128;
    float *d_x, *d_b, *d_w, *d_e, *d_s, *d_n;
    
    void step() {
        size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
        for (size_t y = 1; y < d_ny - 1; ++y) {
            for (size_t x = 1; x < d_nx - 1; ++x) {
                d_x[ic] = d_b[ic]
                    - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                    - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];
                ++ic; ++iw; ++ie; ++is; ++in;
            }
            ic += 2; iw += 2; ie += 2; is += 2; in += 2;
        }
    }
    
    void solve(size_t iters) {
        for (size_t i = 0; i < iters; ++i) {
            step();
        }
    }
    
    void clear(float *a) {
        memset(a, 0, d_nx * d_ny * sizeof(float));
    }
    
    int main(int argc, char **argv) {
        size_t n = d_nx * d_ny;
        d_x = new float[n]; clear(d_x);
        d_b = new float[n]; clear(d_b);
        d_w = new float[n]; clear(d_w);
        d_e = new float[n]; clear(d_e);
        d_s = new float[n]; clear(d_s);
        d_n = new float[n]; clear(d_n);
        solve(atoi(argv[1]));
        cout << d_x[0] << endl; // prevent the thing from being optimized away
    }
    

    我编译并运行如下:

    $ g++ -o gstest -O3 gstest.cpp
    $ time ./gstest 8000
    0
    
    real    0m1.052s
    user    0m1.050s
    sys     0m0.010s
    

    (它每秒钟做8000次而不是3500次迭代,因为我的“真实”程序也做了很多其他的事情。但它是代表性的。)

    更新2 :我被告知,UnitInitialized值可能不具有代表性,因为NaN和Inf值可能会降低速度。现在清除示例代码中的内存。不过,这对我的执行速度没什么影响。

    7 回复  |  直到 14 年前
        1
  •  5
  •   celion    14 年前

    两种想法:

    1. 使用SIMD。您可以从每个数组中一次将4个浮点加载到一个SIMD寄存器(例如,Intel上的SSE、PowerPC上的VMX)。这样做的缺点是,某些d_x值将“过时”,因此您的收敛速度将受到影响(但不会像Jacobi迭代那样糟糕);很难说加速是否会抵消它。

    2. 使用 SOR . 它很简单,不需要增加太多的计算,而且可以很好地提高收敛速度,即使对于相对保守的松弛值(比如1.5)。

    3. 使用共轭梯度。如果这是流体模拟的投影步骤(即强制不可压缩性),您应该能够应用cg并获得更好的收敛速度。一个好的先决条件会有更多的帮助。

    4. 使用专门的解算器。如果线性系统是由 Poisson equation 你甚至可以用基于FFT的方法做得比共轭梯度更好。

    如果你能解释更多关于你试图解决的系统是什么样子的,我可能会给出更多关于3和4的建议。

        2
  •  5
  •   Poni    14 年前

    我想我已经设法优化了它,这里有一个代码,用VC++创建一个新项目,添加这个代码,然后在“release”下简单地编译。

    #include <iostream>
    #include <cstdlib>
    #include <cstring>
    
    #define _WIN32_WINNT 0x0400
    #define WIN32_LEAN_AND_MEAN
    #include <windows.h>
    
    #include <conio.h>
    
    using namespace std;
    
    size_t d_nx = 128, d_ny = 128;
    float *d_x, *d_b, *d_w, *d_e, *d_s, *d_n;
    
    void step_original() {
        size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
        for (size_t y = 1; y < d_ny - 1; ++y) {
            for (size_t x = 1; x < d_nx - 1; ++x) {
                d_x[ic] = d_b[ic]
                    - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                    - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];
                ++ic; ++iw; ++ie; ++is; ++in;
            }
            ic += 2; iw += 2; ie += 2; is += 2; in += 2;
        }
    }
    void step_new() {
        //size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
        float
            *d_b_ic,
            *d_w_ic,
            *d_e_ic,
            *d_x_ic,
            *d_x_iw,
            *d_x_ie,
            *d_x_is,
            *d_x_in,
            *d_n_ic,
            *d_s_ic;
    
        d_b_ic = d_b;
        d_w_ic = d_w;
        d_e_ic = d_e;
        d_x_ic = d_x;
        d_x_iw = d_x;
        d_x_ie = d_x;
        d_x_is = d_x;
        d_x_in = d_x;
        d_n_ic = d_n;
        d_s_ic = d_s;
    
        for (size_t y = 1; y < d_ny - 1; ++y)
        {
            for (size_t x = 1; x < d_nx - 1; ++x)
            {
                /*d_x[ic] = d_b[ic]
                    - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                    - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];*/
                *d_x_ic = *d_b_ic
                    - *d_w_ic * *d_x_iw - *d_e_ic * *d_x_ie
                    - *d_s_ic * *d_x_is - *d_n_ic * *d_x_in;
                //++ic; ++iw; ++ie; ++is; ++in;
                d_b_ic++;
                d_w_ic++;
                d_e_ic++;
                d_x_ic++;
                d_x_iw++;
                d_x_ie++;
                d_x_is++;
                d_x_in++;
                d_n_ic++;
                d_s_ic++;
            }
            //ic += 2; iw += 2; ie += 2; is += 2; in += 2;
            d_b_ic += 2;
            d_w_ic += 2;
            d_e_ic += 2;
            d_x_ic += 2;
            d_x_iw += 2;
            d_x_ie += 2;
            d_x_is += 2;
            d_x_in += 2;
            d_n_ic += 2;
            d_s_ic += 2;
        }
    }
    
    void solve_original(size_t iters) {
        for (size_t i = 0; i < iters; ++i) {
            step_original();
        }
    }
    void solve_new(size_t iters) {
        for (size_t i = 0; i < iters; ++i) {
            step_new();
        }
    }
    
    void clear(float *a) {
        memset(a, 0, d_nx * d_ny * sizeof(float));
    }
    
    int main(int argc, char **argv) {
        size_t n = d_nx * d_ny;
        d_x = new float[n]; clear(d_x);
        d_b = new float[n]; clear(d_b);
        d_w = new float[n]; clear(d_w);
        d_e = new float[n]; clear(d_e);
        d_s = new float[n]; clear(d_s);
        d_n = new float[n]; clear(d_n);
    
        if(argc < 3)
            printf("app.exe (x)iters (o/n)algo\n");
    
        bool bOriginalStep = (argv[2][0] == 'o');
        size_t iters = atoi(argv[1]);
    
        /*printf("Press any key to start!");
        _getch();
        printf(" Running speed test..\n");*/
    
        __int64 freq, start, end, diff;
        if(!::QueryPerformanceFrequency((LARGE_INTEGER*)&freq))
            throw "Not supported!";
        freq /= 1000000; // microseconds!
        {
            ::QueryPerformanceCounter((LARGE_INTEGER*)&start);
            if(bOriginalStep)
                solve_original(iters);
            else
                solve_new(iters);
            ::QueryPerformanceCounter((LARGE_INTEGER*)&end);
            diff = (end - start) / freq;
        }
        printf("Speed (%s)\t\t: %u\n", (bOriginalStep ? "original" : "new"), diff);
        //_getch();
    
    
        //cout << d_x[0] << endl; // prevent the thing from being optimized away
    }
    

    像这样运行:

    app.exe 10000欧

    app.exe 10000 N

    “O”代表旧代码,你的。

    “N”是我的,新的。

    我的结果: 速度(原始):

    1515028年

    1523171个

    1495988年

    速度(新):

    九十六万六千零一十二

    九十八万四千一百一十

    1006045个

    提高了30%左右。

    背后的逻辑: 您一直在使用索引计数器进行访问/操作。 我用指针。 运行时,在VC++的调试器中的某个计算代码行处断点,然后按F8。你会看到分解器窗口。 您将看到生成的操作码(汇编代码)。

    不管怎样,看:

    int*x=…;

    X[3]=123;

    这告诉PC将指针x放在寄存器(比如eax)上。 加上它(3*sizeof(int))。 只有这样,才能将该值设置为123。

    正如您所理解的,指针方法要好得多,因为我们减少了添加过程,实际上我们自己处理它,从而能够根据需要进行优化。

    我希望这有帮助。

    stackoverflow.com员工的旁注: 很好的网站,希望我早就听说了!

        3
  •  3
  •   Community PPrice    7 年前

    首先,这里似乎有一个管道问题。循环从中的值读取 d_x 这已经被写入,但显然它必须等待写入完成。只需重新排列计算顺序,在等待时做一些有用的事情,就可以使计算速度快两倍:

    d_x[ic] = d_b[ic]
                            - d_e[ic] * d_x[ie]
        - d_s[ic] * d_x[is] - d_n[ic] * d_x[in]
        - d_w[ic] * d_x[iw] /* d_x[iw] has just been written to, process this last */;
    

    它是 Eamon Nerbonne 谁知道的。很多人都支持他!我永远也猜不到。

        4
  •  2
  •   Rex Kerr    14 年前

    波尼的回答对我来说是正确的。

    我只是想指出,在这类问题中,你 经常 从记忆位置中获益。现在, b,w,e,s,n 数组都在内存中的不同位置。如果可以的话 在三级缓存中解决这个问题(主要是在二级缓存中),那么这将是不好的,这种解决方案将很有帮助:

    size_t d_nx = 128, d_ny = 128;
    float *d_x;
    
    struct D { float b,w,e,s,n; };
    D *d;
    
    void step() {
        size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
        for (size_t y = 1; y < d_ny - 1; ++y) {
            for (size_t x = 1; x < d_nx - 1; ++x) {
                d_x[ic] = d[ic].b
                    - d[ic].w * d_x[iw] - d[ic].e * d_x[ie]
                    - d[ic].s * d_x[is] - d[ic].n * d_x[in];
                ++ic; ++iw; ++ie; ++is; ++in;
            }
            ic += 2; iw += 2; ie += 2; is += 2; in += 2;
        }
    }
    void solve(size_t iters) { for (size_t i = 0; i < iters; ++i) step(); }
    void clear(float *a) { memset(a, 0, d_nx * d_ny * sizeof(float)); }
    
    int main(int argc, char **argv) {
        size_t n = d_nx * d_ny;
        d_x = new float[n]; clear(d_x);
        d = new D[n]; memset(d,0,n * sizeof(D));
        solve(atoi(argv[1]));
        cout << d_x[0] << endl; // prevent the thing from being optimized away
    }
    

    例如,1280x1280的解决方案略小于2x 更快 比Poni的解决方案(在我的测试中是13秒对23秒——最初的实现是22秒),而在128x128时是30% 更慢的 (7秒对10秒——你的原稿是10秒)。

    (基本情况下迭代扩展到80000次,而更大的1280x1280次则扩展到800次。)

        5
  •  1
  •   miked    14 年前

    我认为你认为记忆成为瓶颈是对的。这是一个非常简单的循环,每次迭代只需要一些简单的算术运算。IC、IW、IE、IS和IN索引似乎位于矩阵的另一边,所以我猜想这里有一堆缓存未命中。

        6
  •  1
  •   Roberto Bonvallet    14 年前

    我不是这个问题的专家,但我已经看到了 there are several academic papers 提高高斯-赛德尔方法的缓存使用率。

    另一种可能的优化方法是使用红黑变种,在这种变种中,点以棋盘状的模式在两次扫描中更新。这样,扫描中的所有更新都是独立的,并且可以并行化。

        7
  •  0
  •   Thomas Matthews    14 年前

    我建议输入一些预取语句,并研究“面向数据的设计”:

    void step_original() {
        size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
        float dw_ic, dx_ic, db_ic, de_ic, dn_ic, ds_ic;
        float dx_iw, dx_is, dx_ie, dx_in, de_ic, db_ic;
        for (size_t y = 1; y < d_ny - 1; ++y) {
            for (size_t x = 1; x < d_nx - 1; ++x) {
    // Perform the prefetch
    // Sorting these statements by array may increase speed;
    //    although sorting by index name may increase speed too.
                db_ic = d_b[ic];
                dw_ic = d_w[ic];
                dx_iw = d_x[iw];
                de_ic = d_e[ic];
                dx_ie = d_x[ie];
                ds_ic = d_s[ic];
                dx_is = d_x[is];
                dn_ic = d_n[ic];
                dx_in = d_x[in];
    // Calculate
                d_x[ic] = db_ic
                    - dw_ic * dx_iw - de_ic * dx_ie
                    - ds_ic * dx_is - dn_ic * dx_in;
                ++ic; ++iw; ++ie; ++is; ++in;
            }
            ic += 2; iw += 2; ie += 2; is += 2; in += 2;
        }
    }
    

    这与第二种方法不同,因为在执行计算之前,值被复制到局部临时变量。