分享

C# 并发队列ConcurrentQueue

 丰收书屋 2019-05-19

C# 并发队列ConcurrentQueue

2016年03月08日 21:26:50 conquerwave 阅读数:17445

测试函数

  1. static async Task RunProgram()
  2. {
  3. var taskQueue = new ConcurrentQueue<CustomTask>();
  4. var cts = new CancellationTokenSource();
  5. //生成任务添加至并发队列
  6. var taskSource = Task.Run(() => TaskProducer(taskQueue));
  7. //同时启动四个任务处理队列中的任务
  8. Task[] processors = new Task[4];
  9. for(int i =1;i <= 4; i++)
  10. {
  11. string processId = i.ToString();
  12. processors[i - 1] = Task.Run(
  13. () => TaskProcessor(taskQueue, "Processor " + processId, cts.Token)
  14. );
  15. }
  16. await taskSource;
  17. //向任务发送取消信号
  18. cts.CancelAfter(TimeSpan.FromSeconds(2));
  19. await Task.WhenAll(processors);
  20. }

产生任务
  1. static async Task TaskProducer(ConcurrentQueue<CustomTask> queue)
  2. {
  3. for(int i= 0;i < 20; i++)
  4. {
  5. await Task.Delay(50);
  6. var workItem = new CustomTask { Id = i };
  7. queue.Enqueue(workItem);
  8. Console.WriteLine("task {0} has been posted", workItem.Id);
  9. }
  10. }

执行任务
  1. static async Task TaskProcessor(ConcurrentQueue<CustomTask> queue, string name, CancellationToken token)
  2. {
  3. CustomTask workItem;
  4. bool dequeueSuccesful = false;
  5. await GetRandomDelay();
  6. do
  7. {
  8. dequeueSuccesful = queue.TryDequeue(out workItem);
  9. if (dequeueSuccesful)
  10. {
  11. Console.WriteLine("task {0} has been processed by {1}", workItem.Id, name);
  12. }
  13. await GetRandomDelay();
  14. }
  15. while (!token.IsCancellationRequested);
  16. }

  1. static Task GetRandomDelay()
  2. {
  3. int delay = new Random(DateTime.Now.Millisecond).Next(1500);
  4. return Task.Delay(delay);
  5. }
  1. class CustomTask
  2. {
  3. public int Id { get; set; }
  4. }

调用
  1. static void Main(string[] args)
  2. {
  3. Task t = RunProgram();
  4. t.Wait();
  5. Console.ReadKey();
  6. }

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多