代码之家  ›  专栏  ›  技术社区  ›  Ugur

了解多任务的异步/等待模式

  •  -1
  • Ugur  · 技术社区  · 7 年前

    目前,我很难理解使用异步和等待模式的多任务处理。为了获得一些基础知识,我编写了以下测试用例;

    public partial class MainWindow : Window
    {
    
        public MainWindow()
        {
            InitializeComponent();
        }
    
        private int global_int = 10;
        public async Task<int> RunAsyncTask()
        {
           // This method runs asynchronously.
           await Task.Run(() => Calculate());
           return global_int;
        }
    
        private int Calculate()
        {
            Console.WriteLine("Ticket count: " + --global_int);
            return global_int;
        }
    
        private async void  Start_Button_Click(object sender, RoutedEventArgs e)
        {
            List<Task<int>> list = new List<Task<int>>(); 
    
            Console.WriteLine("\nReseting: " );
            global_int = 10;
    
            for (int i = 0; i < 10; i++)
            {
                var task = RunAsyncTask();
                list.Add(task); 
            }
    
           await Task.WhenAll(list.ToArray<Task<int>>()); 
            
           Console.WriteLine("\nFinished: " + global_int);
    
        }
    }
    

    10个顾客,10张票,每个顾客买一张票,最后就没有可用的票了。

    问题:

    当我运行代码时,我实际上得到的结果并不总是相同的(总是期望0票)。实际问题在哪里?

    那么,我如何才能以一种结果总是相同的方式编写代码呢。

    输出1:

    Reseting: 
    Ticket count: 9
    Ticket count: 8
    Ticket count: 8
    Ticket count: 7
    Ticket count: 5
    Ticket count: 6
    Ticket count: 4
    Ticket count: 3
    Ticket count: 2
    Ticket count: 1
    
    Finished: 1
    

    输出2:

    Reseting: 
    Ticket count: 9
    Ticket count: 8
    Ticket count: 7
    Ticket count: 6
    Ticket count: 5
    Ticket count: 4
    Ticket count: 3
    Ticket count: 2
    Ticket count: 1
    Ticket count: 0
    
    Finished: 0
    
    2 回复  |  直到 4 年前
        1
  •  6
  •   Jonesopolis    7 年前
    --global_int
    

    这不是线程安全操作。正在读取和写入多个线程 global_int ,导致比赛状态。有一个方便的类叫做 Interlocked Calculate 方法:

    Console.WriteLine("Ticket count: " + Interlocked.Decrement(ref global_int);
    
        2
  •  3
  •   BlueMonkMN    7 年前

    如果您想了解如何在仍然使用异步模式的情况下以单线程方式调度任务,您可能会对这段代码感兴趣。

    class Program
    {
       static void Main(string[] args)
       {
         InitiateCalculations().Wait();
         Console.WriteLine("Finished: {0}", global_int);
       }
    
       // LimitedConcurrencyLevelTaskScheduler from
       // https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler
       // Provides a task scheduler that ensures a maximum concurrency level while 
       // running on top of the thread pool.
       public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
       {
          public static TaskFactory SingleFactory { get; private set; }
    
          static LimitedConcurrencyLevelTaskScheduler()
          {
             SingleFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(1));
          }
    
          // Indicates whether the current thread is processing work items.
          [ThreadStatic]
          private static bool _currentThreadIsProcessingItems;
    
          // The list of tasks to be executed 
          private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
    
          // The maximum concurrency level allowed by this scheduler. 
          private readonly int _maxDegreeOfParallelism;
    
          // Indicates whether the scheduler is currently processing work items. 
          private int _delegatesQueuedOrRunning = 0;
    
          // Creates a new instance with the specified degree of parallelism. 
          public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
          {
             if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
             _maxDegreeOfParallelism = maxDegreeOfParallelism;
          }
    
          // Queues a task to the scheduler. 
          protected sealed override void QueueTask(Task task)
          {
             // Add the task to the list of tasks to be processed.  If there aren't enough 
             // delegates currently queued or running to process tasks, schedule another. 
             lock (_tasks)
             {
                _tasks.AddLast(task);
                if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
                {
                   ++_delegatesQueuedOrRunning;
                   NotifyThreadPoolOfPendingWork();
                }
             }
          }
          // Inform the ThreadPool that there's work to be executed for this scheduler. 
          private void NotifyThreadPoolOfPendingWork()
          {
             ThreadPool.UnsafeQueueUserWorkItem(_ =>
             {
                // Note that the current thread is now processing work items.
                // This is necessary to enable inlining of tasks into this thread.
                _currentThreadIsProcessingItems = true;
                try
                {
                   // Process all available items in the queue.
                   while (true)
                   {
                      Task item;
                      lock (_tasks)
                      {
                         // When there are no more items to be processed,
                         // note that we're done processing, and get out.
                         if (_tasks.Count == 0)
                         {
                            --_delegatesQueuedOrRunning;
                            break;
                         }
    
                         // Get the next item from the queue
                         item = _tasks.First.Value;
                         _tasks.RemoveFirst();
                      }
    
                      // Execute the task we pulled out of the queue
                      base.TryExecuteTask(item);
                   }
                }
                // We're done processing items on the current thread
                finally { _currentThreadIsProcessingItems = false; }
             }, null);
          }
    
          // Attempts to execute the specified task on the current thread. 
          protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
          {
             // If this thread isn't already processing a task, we don't support inlining
             if (!_currentThreadIsProcessingItems) return false;
    
             // If the task was previously queued, remove it from the queue
             if (taskWasPreviouslyQueued)
                // Try to run the task. 
                if (TryDequeue(task))
                   return base.TryExecuteTask(task);
                else
                   return false;
             else
                return base.TryExecuteTask(task);
          }
    
          // Attempt to remove a previously scheduled task from the scheduler. 
          protected sealed override bool TryDequeue(Task task)
          {
             lock (_tasks) return _tasks.Remove(task);
          }
    
          // Gets the maximum concurrency level supported by this scheduler. 
          public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }
    
          // Gets an enumerable of the tasks currently scheduled on this scheduler. 
          protected sealed override IEnumerable<Task> GetScheduledTasks()
          {
             bool lockTaken = false;
             try
             {
                Monitor.TryEnter(_tasks, ref lockTaken);
                if (lockTaken) return _tasks;
                else throw new NotSupportedException();
             }
             finally
             {
                if (lockTaken) Monitor.Exit(_tasks);
             }
          }
       }
    
       static private int global_int = 10;
       public static Task<int> RunAsyncTask()
       {
          return LimitedConcurrencyLevelTaskScheduler.SingleFactory.StartNew<int>(Calculate);
       }
    
       private static int Calculate()
       {
          Thread.Sleep(500);
          Console.WriteLine("Ticket count: {0} Thread: {1}", --global_int, Thread.CurrentThread.ManagedThreadId);
          return global_int;
       }
    
       private static async Task InitiateCalculations()
       {
          List<Task<int>> list = new List<Task<int>>();
    
          Console.WriteLine("\nReseting: ");
          global_int = 10;
    
          for (int i = 0; i < 10; i++)
          {
             var task = RunAsyncTask();
             list.Add(task);
          }
    
          await Task.WhenAll(list.ToArray<Task<int>>());
       }
    }