代码之家  ›  专栏  ›  技术社区  ›  Andreas Niedermair

如何实现自己的高级生产者/消费者方案?

  •  1
  • Andreas Niedermair  · 技术社区  · 14 年前

    注:


    我需要一个“强大的”队列,它提供以下功能:

    • 我对一组对象有一定的作用域。也就是说 , , ... 会有自己的队伍
    • 我正在组作用域线程中填充队列 (制作人)
    • 我正在读取组作用域线程中的队列 (消费者)

    1. 队列中没有项(因为作业是用空的“targetgroup”调用的): 应该逃出这个圈子
    2. 队列中当前没有项目,因为 螺纹A 正在处理要排队的项: 螺纹B
    3. 队列中有项: 螺纹B
    4. 队列中没有项目,因为 螺纹A 没有其他要排队的项目: 应该逃出这个圈子

    现在我提出了以下实施方案:

    public class MightyQueue<T>
      where T : class
    {
        private readonly Queue<T> _queue = new Queue<T>();
    
        private bool? _runable;
        private volatile bool _completed;
    
        public bool Runable
        {
            get
            {
                while (!this._runable.HasValue)
                {
                    Thread.Sleep(100);
                }
                return this._runable ?? false;
            }
            set
            {
                this._runable = value;
            }
        }
    
        public void Enqueue(T item)
        {
            if (item == null)
            {
                throw new ArgumentNullException("item");
            }
    
            this._queue.Enqueue(item);
        }
    
        public void CompleteAdding()
        {
            this._completed = true;
        }
    
        public bool TryDequeue(out T item)
        {
            if (!this.Runable)
            {
                item = null;
                return false;
            }
            while (this._queue.Count == 0)
            {
                if (this._completed)
                {
                    item = null;
                    return false;
                }
                Thread.Sleep(100);
            }
            item = this._queue.Dequeue();
            return true;
        }
    }
    

    生产者

    if (anythingToWorkOn)
    {
        myFooMightyQueueInstance.Runable = false;
    }
    else
    {
        myFooMightyQueueInstance.Runable = true;
        while (condition)
        {
            myFooMightyQueueInstance.Enqueue(item);
        }
        myFooMightyQueueInstance.CompleteAdding();
    }
    

    消费者

    if (!myFooMightyQueueInstance.Runable)
    {
        return;
    }
    
    T item;
    while (myFooMightyQueueInstance.TryDequeue(out item))
    {
        //work with item
    }
    

    但我相信这种方法是错误的 Thread.Sleep() -里面的东西(不能有侍应生之类的东西吗?)... 我也不在乎算法本身。。。 有人能帮我吗?

    5 回复  |  直到 9 年前
        1
  •  1
  •   vgru    14 年前

    您应该从一个通用的生产者-消费者队列开始并使用它。在队列中实现这一点并不是一个好主意,因为这会阻止您使用信号量来向线程发送信号(或者,您的队列中可能有公共信号量,但这是 坏主意)。

    一旦线程A将单个工作项排队,它就必须发出信号通知线程B。当线程B处理完所有项时,它应该发出信号通知其他所有人它已经完成。您的主线程应该在等待第二个信号量知道所有事情都完成了。

    [编辑]

    public interface IProducer<T> : IStoppable
    {
        /// <summary>
        /// Notifies clients when a new item is produced.
        /// </summary>
        event EventHandler<ProducedItemEventArgs<T>> ItemProduced;
    }
    
    public interface IConsumer<T> : IStoppable
    {
        /// <summary>
        /// Performs processing of the specified item.
        /// </summary>
        /// <param name="item">The item.</param>
        void ConsumeItem(T item);
    }
    
    public interface IStoppable
    {
        void Stop();
    }
    

    所以,在您的情况下,创建邮件的类需要启动 ItemProduced ConsumeItem .

    然后将这两个实例传递给 Worker :

    public class Worker<T>
    {
        private readonly Object _lock = new Object();
        private readonly Queue<T> _queuedItems = new Queue<T>();
        private readonly AutoResetEvent _itemReadyEvt = new AutoResetEvent(false);
        private readonly IProducer<T> _producer;
        private readonly IConsumer<T> _consumer;
        private volatile bool _ending = false;
        private Thread _workerThread;
    
        public Worker(IProducer<T> producer, IConsumer<T> consumer)
        {
            _producer = producer;
            _consumer = consumer;
        }
    
        public void Start(ThreadPriority priority)
        {
            _producer.ItemProduced += Producer_ItemProduced;
            _ending = false;
    
            // start a new thread
            _workerThread = new Thread(new ThreadStart(WorkerLoop));
            _workerThread.IsBackground = true;
            _workerThread.Priority = priority;
            _workerThread.Start();
        } 
    
        public void Stop()
        {
            _producer.ItemProduced -= Producer_ItemProduced;
            _ending = true;
    
            // signal the consumer, in case it is idle
            _itemReadyEvt.Set();
            _workerThread.Join();
        }
    
        private void Producer_ItemProduced
             (object sender, ProducedItemEventArgs<T> e)
        {
            lock (_lock) { _queuedItems.Enqueue(e.Item); }
    
            // notify consumer thread
            _itemReadyEvt.Set();
        }
    
        private void WorkerLoop()
        {
            while (!_ending)
            {
                _itemReadyEvt.WaitOne(-1, false);
    
                T singleItem = default(T);
                lock (_lock)
                {
                   if (_queuedItems.Count > 0)
                   {
                       singleItem = _queuedItems.Dequeue();
                   }
                }
    
    
                while (singleItem != null)
                {
                    try
                    {
                        _consumer.ConsumeItem(singleItem);
                    }
                    catch (Exception ex)
                    {
                        // handle exception, fire an event
                        // or something. Otherwise this
                        // worker thread will die and you
                        // will have no idea what happened
                    }
    
                    lock (_lock)
                    {
                        if (_queuedItems.Count > 0)
                        {
                            singleItem = _queuedItems.Dequeue();
                        }
                    }
                }
    
             }
    
        } // WorkerLoop
    
    } // Worker
    

    要使用它,需要让类实现这两个接口:

    IProducer<IMail> mailCreator = new MailCreator();
    IConsumer<IMail> mailSender = new MailSender();
    
    Worker<IMail> worker = new Worker<IMail>(mailCreator, mailSender);
    worker.Start();
    
    // produce an item - worker will add it to the
    // queue and signal the background thread
    mailCreator.CreateSomeMail();
    
    // following line will block this (calling) thread
    // until all items are consumed
    worker.Stop();
    

    最棒的是:

    • 多个工人可以接受来自同一生产商的项目
    • 多个工作者可以将项目分派给同一个使用者(尽管这意味着您需要假设使用者是以线程安全的方式实现的)
        2
  •  2
  •   Marcelo Cantos    14 年前

    如果有.Net 4.0,请使用 BlockingCollection . 它通过 CompleteAdding 方法。

    如果你有一个早期的.Net,那么升级(即,我懒得解释如何实现已经为你做过的事情)

    编辑:

        3
  •  1
  •   MatthiasG    14 年前

    我写了一个简单的例子,对我来说很好,应该适合你的场景。如果用户正在运行取决于运行变量是如何设置的,但是您可以轻松地将其修改为更复杂的条件,比如“如果没有邮件存在,但有人说我应该等待更多”。

    public class MailSystem
    {
        private readonly Queue<Mail> mailQueue = new Queue<Mail>();
        private bool running;
        private Thread consumerThread;
    
        public static void Main(string[] args)
        {
            MailSystem mailSystem = new MailSystem();
            mailSystem.StartSystem();
        }
    
        public void StartSystem()
        {
            // init consumer
            running = true;
            consumerThread = new Thread(ProcessMails);
            consumerThread.Start();
            // add some mails
            mailQueue.Enqueue(new Mail("Mail 1"));
            mailQueue.Enqueue(new Mail("Mail 2"));
            mailQueue.Enqueue(new Mail("Mail 3"));
            mailQueue.Enqueue(new Mail("Mail 4"));
            Console.WriteLine("producer finished, hit enter to stop consumer");
            // wait for user interaction
            Console.ReadLine();
            // exit the consumer
            running = false;
            Console.WriteLine("exited");
        }
    
        private void ProcessMails()
        {
            while (running)
            {
                if (mailQueue.Count > 0)
                {
                    Mail mail = mailQueue.Dequeue();
                    Console.WriteLine(mail.Text);
                    Thread.Sleep(2000);
                }
            }
        }
    }
    
    internal class Mail
    {
        public string Text { get; set; }
    
        public Mail(string text)
        {
            Text = text;
        }
    }
    
        4
  •  1
  •   dutt    14 年前

    你想要的可以用条件变量来完成。我将编写一个伪代码示例,应该不会太难实现。

    一根线有如下线索:

    while(run)
      compose message
      conditionvariable.lock()
      add message to queue
      conditionvariable.notifyOne()
      conditionvariable.release()
    

    而另一条线有这样的线索

    while(threadsafe_do_run())
      while threadsafe_queue_empty()
           conditionvariable.wait()
      msg = queue.pop()
      if msg == "die"
          set_run(false)
      conditionvariable.release()
      send msg
    

    所以如果你没有收到任何信息,就推一条死亡信息。当所有的信息都被处理过的时候。

    调用notifyOne()时wait()返回,然后队列有消息要发送。在大多数框架中,conditionvariable已经拥有锁,您可能需要在.NET中自己添加lock语句。

        5
  •  0
  •   Shiv Kumar    14 年前

    我会用一个线程来完成整个过程。即生成邮件正文并发送。只是因为生成邮件正文不需要时间,但是发送电子邮件需要时间。

    因此,可以在每个线程执行其任务的地方启动多个线程(在数量上保持上限)。如果您使用sat处理作业(数据)的集合,那么您可以并行化数据(即将数据分割成与系统上的核心数匹配的块,例如作业的快照(线程)。

    使用任务将使所有这一切变得更加简单,无论你走哪条路,即2个线程发送一封电子邮件或一个线程完成整个工作,但使用多个线程并行完成多个工作。