代码之家  ›  专栏  ›  技术社区  ›  viraptor

同步线程启动

  •  2
  • viraptor  · 技术社区  · 6 年前

    我正在运行一些代码(简化版,但下面的版本仍处于中断状态),在等待第一次切换时,大约1/3000次执行失败。应该做的是:

    • threads[0] 启动并获取互斥锁
    • 线程[0] 通知 cond_main thread[1]
    • 线程[1] / thread[0] 在等待对方的信号时做一些工作

    不幸的是,它失败了 线程[0] - cond.wait 以超时结束并引发异常。我该如何同步,确保 主管道 不会太早收到信号吗?

    理想情况下,我希望从主线程传递一个锁定的互斥体,并在第一个生成的线程中解锁它,但是Ruby要求在同一个线程中解锁互斥体,所以这不起作用。

    自给自足的复制机(本身没有多大意义,但实际工作已被剥离):

    def run_test
      mutex     = Mutex.new
      cond      = ConditionVariable.new
      cond_main = ConditionVariable.new
      threads   = []
    
      t1_done = false
      t2_done = false
    
      threads << Thread.new do
        mutex.synchronize do
          # this needs to happen first
          cond_main.signal
          cond.wait(mutex, 2)
          raise 'timeout waiting for switch' if !t2_done
    
          # some work
          t1_done = true
          cond.signal
        end
      end
      cond_main.wait(Mutex.new.lock, 2)
    
      threads << Thread.new do
        mutex.synchronize do
          cond.signal
          # some work
          t2_done = true
          cond.wait(mutex, 2)
          raise 'timeout waiting for switch' if !t1_done
        end
      end
    
      threads.map(&:join)
    end
    
    5000.times { |x|
      puts "Run #{x}"
      run_test
    }
    

    在Ruby 2.5.3上测试

    1 回复  |  直到 6 年前
        1
  •  1
  •   Marcin Kołodziej    6 年前

    设置while块以在第二个线程完成时停止等待(请参阅更多信息) here ):

    def run_test
      mutex     = Mutex.new
      cond      = ConditionVariable.new
      cond_main = ConditionVariable.new
      threads   = []
    
      spawned = false
    
      t1_done = false
      t2_done = false
    
      threads << Thread.new do
        mutex.synchronize do
          while(!spawned) do
            cond.wait(mutex, 2)
          end
          raise 'timeout waiting for switch' if !t2_done
    
          # some work
          t1_done = true
          cond.signal
        end
      end
    
      threads << Thread.new do
        mutex.synchronize do
          spawned = true
          cond.signal
          # some work
          t2_done = true
          cond.wait(mutex, 2)
          raise 'timeout waiting for switch' if !t1_done
        end
      end
    
      threads.map(&:join)
    end
    
    50000.times { |x| 
      puts x 
      run_test 
    }
    

    或者,使用 counting semaphore ,我们可以为线程分配一些优先级:

    require 'concurrent-ruby'
    
    def run_test
      mutex     = Mutex.new
      sync      = Concurrent::Semaphore.new(0)
      cond      = ConditionVariable.new
      cond_main = ConditionVariable.new
      threads   = []
    
      t1_done = false
      t2_done = false
    
      threads << Thread.new do
        mutex.synchronize do
          sync.release(1)
          # this needs to happen first
          cond.wait(mutex, 2)
          raise 'timeout waiting for switch' if !t2_done
    
          # some work
          t1_done = true
          cond.signal
        end
      end
    
      threads << Thread.new do
        sync.acquire(1)
        mutex.synchronize do
          cond.signal
          # some work
          t2_done = true
          cond.wait(mutex, 2)
          raise 'timeout waiting for switch' if !t1_done
        end
      end
    
      threads.map(&:join)
    end
    
    50000.times { |x| 
      puts x 
      run_test 
    }
    

    奇怪的是,在Ruby 2.6上,您的代码似乎不会引发异常(测试运行次数超过1000万次)。