代码之家  ›  专栏  ›  技术社区  ›  Chris McCall

在引发事件时,我应该如何实现“安静期”?

  •  5
  • Chris McCall  · 技术社区  · 14 年前

    我正在使用订阅服务器/通知程序模式从我的.NET中间层在C_中引发和使用事件。例如,当从导入文件的批处理程序中持久化数据时,一些事件会以“突发”的形式出现。这将执行一个可能长期运行的任务,我希望通过实现一个“静默期”来避免每秒触发事件几次,在静默期中,事件系统将一直等待,直到事件流速度减慢以处理事件。

    当发布服务器在通知订阅服务器时扮演主动角色时,我应该如何做?我不想等到有一件事发生时再去看看是否还有其他人在等待安静的时间…

    目前没有用于轮询订阅模型的主机进程。我应该放弃发布/订阅模式还是有更好的方法?

    3 回复  |  直到 14 年前
        1
  •  1
  •   Dr. Wily's Apprentice    14 年前

    下面是一个粗略的实现,它可能会为您指明一个方向。在我的示例中,涉及通知的任务是保存数据对象。保存对象时,将引发保存的事件。除了一个简单的保存方法外,我还实现了beginsave和endsave方法,以及与这两个方法一起用于批保存的保存过载。调用endsave时,将激发单个batchsave事件。

    显然,你可以根据自己的需要修改这个。在我的示例中,我跟踪了在批处理操作期间保存的所有对象的列表,但这可能不是您需要做的事情……您可能只关心保存了多少对象,甚至只关心批处理保存操作已完成。如果您预期会保存大量对象,那么将它们存储在列表中(如我的示例中所示)可能会成为内存问题。

    编辑:我在我的示例中添加了一个“阈值”概念,它试图防止在内存中保存大量对象。但是,这会导致batchsaved事件更频繁地触发。我还添加了一些锁来解决潜在的线程安全问题,尽管我可能遗漏了一些东西。

    class DataConcierge<T>
    {
        // *************************
        // Simple save functionality
        // *************************
    
        public void Save(T dataObject)
        {
            // perform save logic
    
            this.OnSaved(dataObject);
        }
    
        public event DataObjectSaved<T> Saved;
    
        protected void OnSaved(T dataObject)
        {
            var saved = this.Saved;
            if (saved != null)
                saved(this, new DataObjectEventArgs<T>(dataObject));
        }
    
        // ************************
        // Batch save functionality
        // ************************
    
        Dictionary<BatchToken, List<T>> _BatchSavedDataObjects = new Dictionary<BatchToken, List<T>>();
        System.Threading.ReaderWriterLockSlim _BatchSavedDataObjectsLock = new System.Threading.ReaderWriterLockSlim();
    
        int _SavedObjectThreshold = 17; // if the number of objects being stored for a batch reaches this threshold, then those objects are to be cleared from the list.
    
        public BatchToken BeginSave()
        {
            // create a batch token to represent this batch
            BatchToken token = new BatchToken();
    
            _BatchSavedDataObjectsLock.EnterWriteLock();
            try
            {
                _BatchSavedDataObjects.Add(token, new List<T>());
            }
            finally
            {
                _BatchSavedDataObjectsLock.ExitWriteLock();
            }
            return token;
        }
    
        public void EndSave(BatchToken token)
        {
            List<T> batchSavedDataObjects;
            _BatchSavedDataObjectsLock.EnterWriteLock();
            try
            {
                if (!_BatchSavedDataObjects.TryGetValue(token, out batchSavedDataObjects))
                    throw new ArgumentException("The BatchToken is expired or invalid.", "token");
    
                this.OnBatchSaved(batchSavedDataObjects); // this causes a single BatchSaved event to be fired
    
                if (!_BatchSavedDataObjects.Remove(token))
                    throw new ArgumentException("The BatchToken is expired or invalid.", "token");
            }
            finally
            {
                _BatchSavedDataObjectsLock.ExitWriteLock();
            }
        }
    
        public void Save(BatchToken token, T dataObject)
        {
            List<T> batchSavedDataObjects;
            // the read lock prevents EndSave from executing before this Save method has a chance to finish executing
            _BatchSavedDataObjectsLock.EnterReadLock();
            try
            {
                if (!_BatchSavedDataObjects.TryGetValue(token, out batchSavedDataObjects))
                    throw new ArgumentException("The BatchToken is expired or invalid.", "token");
    
                // perform save logic
    
                this.OnBatchSaved(batchSavedDataObjects, dataObject);
            }
            finally
            {
                _BatchSavedDataObjectsLock.ExitReadLock();
            }
        }
    
        public event BatchDataObjectSaved<T> BatchSaved;
    
        protected void OnBatchSaved(List<T> batchSavedDataObjects)
        {
            lock (batchSavedDataObjects)
            {
                var batchSaved = this.BatchSaved;
                if (batchSaved != null)
                    batchSaved(this, new BatchDataObjectEventArgs<T>(batchSavedDataObjects));
            }
        }
    
        protected void OnBatchSaved(List<T> batchSavedDataObjects, T savedDataObject)
        {
            // add the data object to the list storing the data objects that have been saved for this batch
            lock (batchSavedDataObjects)
            {
                batchSavedDataObjects.Add(savedDataObject);
    
                // if the threshold has been reached
                if (_SavedObjectThreshold > 0 && batchSavedDataObjects.Count >= _SavedObjectThreshold)
                {
                    // then raise the BatchSaved event with the data objects that we currently have
                    var batchSaved = this.BatchSaved;
                    if (batchSaved != null)
                        batchSaved(this, new BatchDataObjectEventArgs<T>(batchSavedDataObjects.ToArray()));
    
                    // and clear the list to ensure that we are not holding on to the data objects unnecessarily
                    batchSavedDataObjects.Clear();
                }
            }
        }
    }
    
    class BatchToken
    {
        static int _LastId = 0;
        static object _IdLock = new object();
    
        static int GetNextId()
        {
            lock (_IdLock)
            {
                return ++_LastId;
            }
        }
    
        public BatchToken()
        {
            this.Id = GetNextId();
        }
    
        public int Id { get; private set; }
    }
    
    class DataObjectEventArgs<T> : EventArgs
    {
        public T DataObject { get; private set; }
    
        public DataObjectEventArgs(T dataObject)
        {
            this.DataObject = dataObject;
        }
    }
    
    delegate void DataObjectSaved<T>(object sender, DataObjectEventArgs<T> e);
    
    class BatchDataObjectEventArgs<T> : EventArgs
    {
        public IEnumerable<T> DataObjects { get; private set; }
    
        public BatchDataObjectEventArgs(IEnumerable<T> dataObjects)
        {
            this.DataObjects = dataObjects;
        }
    }
    
    delegate void BatchDataObjectSaved<T>(object sender, BatchDataObjectEventArgs<T> e);
    

    在我的示例中,我选择使用令牌概念来创建单独的批。这允许在不同线程上运行的较小批处理操作完成并引发事件,而不需要等待较大的批处理操作完成。

    我做了单独的事件:保存和批保存。然而,这些可以很容易地合并为一个事件。

    编辑:修正了StevenSudit在访问赛事代表时指出的比赛条件。

    编辑:在我的示例中,修改了锁定代码以使用readerwriterlockslim而不是monitor(即“lock”语句)。我认为在save和endsave方法之间存在一些竞争条件。可以执行endsave,从而从字典中删除数据对象列表。如果save方法同时在另一个线程上执行,则可以将数据对象添加到该列表中,即使该对象已从字典中删除。

    在我修改过的示例中,这种情况不会发生,如果save方法在endsave之后执行,它将抛出异常。这些比赛条件主要是由于我试图避免我认为不必要的锁定。我意识到在一个锁中需要更多的代码,但是决定使用readerwriterlockslim而不是monitor,因为我只想防止save和endsave同时执行;不需要防止多个线程同时执行save。注意,monitor仍然用于同步对从字典中检索到的特定数据对象列表的访问。

    编辑:添加用法示例

    下面是上述示例代码的用法示例。

        static void DataConcierge_Saved(object sender, DataObjectEventArgs<Program.Customer> e)
        {
            Console.WriteLine("DataConcierge<Customer>.Saved");
        }
    
        static void DataConcierge_BatchSaved(object sender, BatchDataObjectEventArgs<Program.Customer> e)
        {
            Console.WriteLine("DataConcierge<Customer>.BatchSaved: {0}", e.DataObjects.Count());
        }
    
        static void Main(string[] args)
        {
            DataConcierge<Customer> dc = new DataConcierge<Customer>();
            dc.Saved += new DataObjectSaved<Customer>(DataConcierge_Saved);
            dc.BatchSaved += new BatchDataObjectSaved<Customer>(DataConcierge_BatchSaved);
    
            var token = dc.BeginSave();
            try
            {
                for (int i = 0; i < 100; i++)
                {
                    var c = new Customer();
                    // ...
                    dc.Save(token, c);
                }
            }
            finally
            {
                dc.EndSave(token);
            }
        }
    

    这导致了以下输出:

    dataconcierege<customer>。保存的批数:17

    dataconcierege<customer>。保存的批数:17

    dataconcierege<customer>。保存的批数:17

    dataconcierege<customer>。保存的批数:17

    dataconcierege<customer>。保存的批数:17

    dataconcierege<customer>。保存的批数:15

    在我的示例中,阈值设置为17,因此一批100个项目会导致batchsaved事件触发6次。

        2
  •  1
  •   Grzenio    14 年前

    我不确定我是否正确理解了你的问题,但我会尝试从源头上解决这个问题-确保事件不会以“爆发”的形式出现。您可以考虑实现批处理操作,这可以从文件导入程序中使用。在你的中间层,这将被视为一个单独的事件,并引发一个单独的事件。

    我认为,如果您不能进行上面概述的更改,那么实现一些合理的解决方案将非常困难——您可以尝试将发布服务器包装在“缓存”发布服务器中,这样可以实现一些启发式方法来缓存突发事件。最简单的方法是,如果当前正在处理另一个相同类型的事件(因此您的批处理至少会导致两个事件-一个在最开始,一个在结束)。您可以等待很短的时间,并且只在下一个事件在这段时间内没有出现时引发事件,但是即使管道中只有一个事件,您也会得到一个时间延迟。您还需要确保将不时地引发事件,即使事件队列不断,否则发布者可能会挨饿。

    第二个选项很难实现,并且将包含启发式方法,这可能会出错…

        3
  •  0
  •   batwad    14 年前

    有一个想法刚从我脑子里掉出来。我不知道它有多可行,也看不到一个明显的方法使它更通用,但它可能是一个开始。它所做的只是为按钮单击事件提供一个缓冲区(根据需要替换为事件)。

    class ButtonClickBuffer
    {
        public event EventHandler BufferedClick;
    
        public ButtonClickBuffer(Button button, int queueSize)
        {
            this.queueSize= queueSize;
            button.Click += this.button_Click;
        }
    
        private int queueSize;
        private List<EventArgs> queuedEvents = new List<EventArgs>();
    
        private void button_Click(object sender, EventArgs e)
        {
            queuedEvents.Add(e);
            if (queuedEvents.Count >= queueSize)
            {
                if (this.BufferedClick!= null)
                {
                    foreach (var args in this.queuedEvents)
                    {
                        this.BufferedClick(sender, args);
                    }
                    queuedEvents.Clear();
                }
            }
        }
    }
    

    因此,您的订阅服务器,而不是订阅为:

    this.button1.Click += this.button1_Click;
    

    将使用缓冲区,指定要等待的事件数:

    ButtonClickBuffer buffer = new ButtonClickBuffer(this.button1, 5);
    buffer.BufferedClick += this.button1_Click;
    

    它在一个简单的测试形式下工作,我敲了敲,但它远远没有生产准备好!

    你说你不想等待一个事件来查看是否有一个队列在等待,这正是它所做的。您可以替换缓冲区内的逻辑,以生成一个新线程,该线程监视队列并根据需要分派事件。天知道线程和锁定问题会从中产生什么!