代码之家  ›  专栏  ›  技术社区  ›  stephan

编译器优化中断多线程代码

  •  8
  • stephan  · 技术社区  · 14 年前

    在经历了艰苦的学习之后 shared variables are currently not guarded by memory barriers ,我现在遇到了另一个问题。要么我做错了什么,要么DMD中现有的编译器优化可以通过重新排序读取多线程代码 共享 变量。

    例如,当我使用 dmd -O (完全优化),编译器愉快地优化了本地变量 o 在这个代码中(其中 cas 比较和交换函数来自 core.atomic )

    shared uint cnt;
    void atomicInc  ( ) { uint o; do { o = cnt; } while ( !cas( &cnt, o, o + 1 ) );}
    

    类似于这样(请参见下面的dis assembly):

    shared uint cnt;
    void atomicInc  ( ) { while ( !cas( &cnt, cnt, cnt + 1 ) ) { } }
    

    在“优化”代码中 cnt 从内存中读取两次,因此运行另一个线程已修改的风险 碳纳米管 介于两者之间。优化基本上破坏了比较交换算法。

    这是一个错误,还是有一个正确的方法来达到预期的结果?到目前为止,我找到的唯一解决办法是使用汇编程序实现代码。

    完整测试代码和其他详细信息
    为了完整性,这里有一个完整的测试代码,它显示了这两个问题(没有内存障碍和优化问题)。它在三台不同的Windows机器上为dmd 2.049和dmd2.050生成以下输出(假设Dekker的算法没有死锁,这可能会发生):

    dmd -O -run optbug.d
    CAS   : failed
    Dekker: failed
    

    还有里面的回路 atomicInc 通过完全优化编译为:

    ; cnt is stored at 447C10h
    ; while ( !cas( &cnt, o, o + 1 ) ) o = cnt;
    ; 1) prepare call cas( &cnt, o, o + 1 ): &cnt and o go to stack, o+1 to eax
    402027: mov    ecx,447C10h         ; ecx = &cnt
    40202C: mov    eax,[447C10h]     ; eax = o1 = cnt
    402031: inc    eax                 ; eax = o1 + 1 (third parameter)
    402032: push   ecx                 ; push &cnt (first parameter)
        ; next instruction pushes current value of cnt onto stack
        ; as second parameter o instead of re-using o1
    402033: push   [447C10h]    
    402039: call   4020BC              ; 2) call cas    
    40203E: xor    al,1                ; 3) test success
    402040: jne    402027              ; no success try again
    ; end of main loop
    

    以下是测试代码:

    import core.atomic;
    import core.thread;
    import std.stdio;
    
    enum loops = 0xFFFF;
    shared uint cnt;
    
    /* *****************************************************************************
     Implement atomicOp!("+=")(cnt, 1U); with CAS. The code below doesn't work with
     the "-O" compiler flag because cnt is read twice while calling cas and another
     thread can modify cnt in between.
    */
    enum threads = 8;
    
    void atomicInc  ( ) { uint o; do { o = cnt; } while ( !cas( &cnt, o, o + 1 ) );}
    void threadFunc ( ) { foreach (i; 0..loops) atomicInc; }
    
    void testCas ( ) {
        cnt = 0;
        auto tgCas = new ThreadGroup;
        foreach (i; 0..threads) tgCas.create(&threadFunc);
        tgCas.joinAll;
        writeln( "CAS   : ", cnt == loops * threads ? "passed" : "failed" );
    }
    
    /* *****************************************************************************
     Dekker's algorithm. Fails on ia32 (other than atom) because ia32 can re-order 
     read before write. Most likely fails on many other architectures.
    */
    shared bool flag1 = false;
    shared bool flag2 = false;
    shared bool turn2 = false;   // avoids starvation by executing 1 and 2 in turns
    
    void dekkerInc ( ) {
        flag1 = true;
        while ( flag2 ) if ( turn2 ) {
            flag1 = false; while ( turn2 )  {  /* wait until my turn */ }
            flag1 = true;
        }
        cnt++;                   // shouldn't work without a cast
        turn2 = true; flag1 = false;
    }
    
    void dekkerDec ( ) {
        flag2 = true;
        while ( flag1 ) if ( !turn2 ) {
            flag2 = false; while ( !turn2 ) { /* wait until my turn */ }
            flag2 = true;
        }
        cnt--;                   // shouldn't work without a cast
        turn2 = false; flag2 = false;
    }
    
    void threadDekkerInc ( ) { foreach (i; 0..loops) dekkerInc; }
    void threadDekkerDec ( ) { foreach (i; 0..loops) dekkerDec; }
    
    void testDekker ( ) {
        cnt = 0;
        auto tgDekker = new ThreadGroup;
        tgDekker.create( &threadDekkerInc );
        tgDekker.create( &threadDekkerDec );
        tgDekker.joinAll;
        writeln( "Dekker: ", cnt == 0 ? "passed" : "failed" );
    }
    
    /* ************************************************************************** */
    void main() {
        testCas;
        testDekker;
    }
    
    2 回复  |  直到 12 年前
        1
  •  4
  •   stephan    12 年前

    虽然问题似乎仍然存在, core.atomic 现在暴露 atomicLoad 这使得解决方法相对简单。使 cas 示例工作,它足以加载 cnt 原子的:

    void atomicInc  ( ) { 
        uint o; 
        do {
             o = atomicLoad(cnt); 
        } while ( !cas( &cnt, o, o + 1 ) );
    }
    

    同样,要使Dekker的算法工作:

    // ...
    while ( atomicLoad(flag2) ) if ( turn2 ) {
    // ...
    while ( atomicLoad(flag1) ) if ( !turn2 ) {
    // ...
    

    对于ia32以外的架构(忽略字符串操作和SSE),也可以重新排序

    • 相对于读取的读取
    • 或相对于写入的写入
    • 或写入和读取到相同的内存位置

    需要额外的内存屏障。

        2
  •  3
  •   Timothy Baldridge    14 年前

    是的,用汇编语言编写。如果跳过使用cas()函数,只在程序集中编写整个atomicInt函数,那么只需要几行代码。在你这样做之前,你很可能会反对编译器的优化。

    最重要的是,您可以使用x86 LOCK INC指令而不是CAS,并且您应该能够将函数减少到程序集的一两行。