代码之家  ›  专栏  ›  技术社区  ›  Emixam23

可取消信号量lim-你能建议我的封装有什么改进吗?

  •  0
  • Emixam23  · 技术社区  · 6 年前

    SemaphoreSlim . (信号量限制消除(封装)

    这是我的班级:

    public class CancellableSemaphoreSlim
    {
        readonly Queue<CancellationTokenSource> tokens = new Queue<CancellationTokenSource>();
        readonly SemaphoreSlim ss;
    
        /// <summary>
        /// Initializes a new instance of the <see cref="T:Eyes.Mobile.Core.Helpers.CancellableSemaphoreSlim"/> class.
        /// </summary>
        /// <param name="initialCount">Initial count.</param>
        public CancellableSemaphoreSlim(int initialCount) { ss = new SemaphoreSlim(initialCount); }
    
        /// <summary>Asynchronously waits to enter the <see cref="T:System.Threading.SemaphoreSlim" />, while observing a <see cref="T:System.Threading.CancellationToken" />. </summary>
        /// <returns>A task that will complete when the semaphore has been entered. </returns>
        /// <exception cref="T:System.ObjectDisposedException">The current instance has already been disposed.</exception>
        /// <exception cref="T:System.OperationCanceledException" />
        public Task WaitAsync()
        {
            CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
            tokens.Enqueue(cancellationTokenSource);
            return ss.WaitAsync(cancellationTokenSource.Token);
        }
    
        /// <summary>Asynchronously waits to enter the <see cref="T:System.Threading.SemaphoreSlim" />, while observing a <see cref="T:System.Threading.CancellationTokenSource" />. </summary>
        /// <returns>A task that will complete when the semaphore has been entered. </returns>
        /// <param name="cancellationTokenSource">The <see cref="T:System.Threading.CancellationToken" /> token to observe.</param>
        /// <exception cref="T:System.ObjectDisposedException">The current instance has already been disposed.</exception>
        /// <exception cref="T:System.OperationCanceledException">
        ///     <paramref name="cancellationTokenSource" /> was canceled. 
        /// </exception>
        public Task WaitAsync(CancellationToken cancellationToken)
        {
            CancellationTokenSource cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            tokens.Enqueue(cancellationTokenSource);
            return ss.WaitAsync(cancellationTokenSource.Token);
        }
    
        /// <summary>
        /// Release this instance.
        /// </summary>
        /// <returns>The released semaphore return.</returns>
        public int Release() => ss.Release();
    
        /// <summary>
        /// Cancel all processus currently in WaitAsync() state.
        /// </summary>
        public void CancelAll()
        {
            while (tokens.Count > 0)
            {
                CancellationTokenSource token = tokens.Dequeue();
                if (!token.IsCancellationRequested)
                    token.Cancel();
            }
        }
    }
    

    你可以把它当作基本的 信号量限制 ,我写了一个简单的例子:

    class Program
    {
        static void Main(string[] args)
        {
            AsyncContext.Run(() => MainAsync(args));
        }
    
        static async void MainAsync(string[] args)
        {
            for (int i = 0; i < 5; i++)
            {
                try
                {
                    CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(10000);
                    await Task.WhenAll(
                        MakeAnAction(i, cancellationTokenSource),
                        MakeAnAction(i, cancellationTokenSource),
                        MakeAnAction(i, cancellationTokenSource),
                        MakeAnAction(i, cancellationTokenSource),
                        MakeAnAction(i, cancellationTokenSource)
                        );
                }
                catch (OperationCanceledException) { }
            }
            await Task.Delay(5000);
            cancellableSemaphoreSlim.CancelAll();
            await Task.Delay(5000);
        }
    
        readonly static CancellableSemaphoreSlim cancellableSemaphoreSlim = new CancellableSemaphoreSlim(1);
        readonly static Random rnd = new Random();
    
        internal static async Task MakeAnAction(int id, CancellationTokenSource cancellationTokenSource)
        {
            try
            {
                await cancellableSemaphoreSlim.WaitAsync(cancellationTokenSource.Token);
                int actionTime = rnd.Next(2, 10) * 1000;
                Output($"{id} : Start ({actionTime})");
                await Task.Delay(actionTime, cancellationTokenSource.Token);
                Output($"{id} : OK ({actionTime})");
            }
            catch (OperationCanceledException)
            {
                Output($"{id} : Cancelled");
            }
            finally
            {
                cancellableSemaphoreSlim.Release();
            }
        }
    
        private static void Output(string str)
        {
            Debug.WriteLine(str);
            Console.WriteLine(str);
        }
    }
    

    不过,我想知道是否使用 Queue<CancellationTokenSource> 会产生任何异步问题吗?因为,如果我们有一个方法(makeAnAction-like),它可以被不同的线程/任务调用,如果 取消所有() 在新的任务/线程调用makeAnAction之前调用,这意味着这一个将被添加到队列中,该队列实际上正在使其所有项出列。。

    CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) . 但是,即使是varargs逻辑( params

    我只是想以一种不会失败的方式来实现它,但我想我目前的方法不太好,所以我想知道是否有人能提供我一个关于这种封装及其逻辑的观点?

    如果你觉得有些东西不合逻辑,请随时给我提建议:)

    马克斯

    编辑1

    然后我编辑了代码,以便跟随@NthDeveloper的讨论。我试着添加

    public class CancellableSemaphoreSlim
    {
        object _syncObj = new object();
        readonly Queue<CancellationTokenSource> tokens = new Queue<CancellationTokenSource>();
        readonly SemaphoreSlim ss;
    
        /// <summary>
        /// Initializes a new instance of the <see cref="T:Eyes.Mobile.Core.Helpers.CancellableSemaphoreSlim"/> class.
        /// </summary>
        /// <param name="initialCount">Initial count.</param>
        public CancellableSemaphoreSlim(int initialCount) { ss = new SemaphoreSlim(initialCount); }
    
        /// <summary>Asynchronously waits to enter the <see cref="T:System.Threading.SemaphoreSlim" />, while observing a <see cref="T:System.Threading.CancellationToken" />. </summary>
        /// <returns>A task that will complete when the semaphore has been entered. </returns>
        /// <exception cref="T:System.ObjectDisposedException">The current instance has already been disposed.</exception>
        /// <exception cref="T:System.OperationCanceledException" />
        public Task WaitAsync()
        {
            CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
            lock (_syncObj)
            {
                tokens.Enqueue(cancellationTokenSource);
            }
            return ss.WaitAsync(cancellationTokenSource.Token);
        }
    
        /// <summary>Asynchronously waits to enter the <see cref="T:System.Threading.SemaphoreSlim" />, while observing a <see cref="T:System.Threading.CancellationTokenSource" />. </summary>
        /// <returns>A task that will complete when the semaphore has been entered. </returns>
        /// <param name="cancellationTokenSource">The <see cref="T:System.Threading.CancellationToken" /> token to observe.</param>
        /// <exception cref="T:System.ObjectDisposedException">The current instance has already been disposed.</exception>
        /// <exception cref="T:System.OperationCanceledException">
        ///     <paramref name="cancellationTokenSource" /> was canceled. 
        /// </exception>
        public Task WaitAsync(CancellationToken cancellationToken)
        {
            CancellationTokenSource cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            lock (_syncObj)
            {
                tokens.Enqueue(cancellationTokenSource);
            }
            return ss.WaitAsync(cancellationTokenSource.Token);
        }
    
        /// <summary>
        /// Release this instance.
        /// </summary>
        /// <returns>The released semaphore return.</returns>
        public int Release() => ss.Release();
    
        /// <summary>
        /// Cancel all processus currently in WaitAsync() state.
        /// </summary>
        public void CancelAll()
        {
            lock (_syncObj)
            {
                while (tokens.Count > 0)
                {
                    CancellationTokenSource token = tokens.Dequeue();
                    if (!token.IsCancellationRequested)
                        token.Cancel();
                }
            }
        }
    }
    
    2 回复  |  直到 6 年前
        1
  •  2
  •   Peter Wishart    6 年前

    我认为你只需使用一个 CancellationSource CancelAll :

    public class CancellableSemaphoreSlim
    {
        CancellationTokenSource cancelSource = new CancellationTokenSource();
        readonly SemaphoreSlim ss;
    
        public CancellableSemaphoreSlim(int initialCount) 
        { 
            ss = new SemaphoreSlim(initialCount); 
        }
    
        public Task WaitAsync() => ss.WaitAsync(cancelSource.Token);
    
        public Task WaitAsync(CancellationToken cancellationToken)
        {
            // This operation will cancel when either the user token or our cancelSource signal cancellation
            CancellationTokenSource linkedSource =  CancellationTokenSource.CreateLinkedTokenSource(cancelSource.Token, cancellationToken);
            return ss.WaitAsync(linkedSource.Token);
        }
    
        public int Release() => ss.Release();
    
        public void CancelAll()
        {
            var currentCancelSource = Interlocked.Exchange(ref cancelSource, new CancellationTokenSource());
            currentCancelSource.Cancel();
        }
    }
    

    WaitAsync 被呼叫取消 全部取消

    在这个版本中,它只取决于旧的还是新的 cancelSource.Token 被抓住了 WaitAsync()

        2
  •  0
  •   NthDeveloper    6 年前

    示例线程安全类,用于保护其内部列表不受同时更改的影响,并防止在释放类后使用该类。

    public class SampleThreadSafeDisposableClass: IDisposable
    {
        bool _isDisposed;
        object _syncObj = new object();
    
        List<object> _list = new List<object>();
    
        public void Add(object obj)
        {
            lock(_syncObj)
            {
                if (_isDisposed)
                    return;
    
                _list.Add(obj);
            }
        }       
    
        //This method can be Dispose/Clear/CancelAll
        public void Dispose()
        {
            lock (_syncObj)
            {
                if (_isDisposed)
                    return;
    
                _isDisposed = true;
    
                _list.Clear();
            }
        }
    }
    

    希望这有帮助。