代码之家  ›  专栏  ›  技术社区  ›  ren

将传入的并行请求打包为一个请求的模式。

  •  1
  • ren  · 技术社区  · 6 年前

    假设有许多随机进入的线程并行访问同一个资源。要访问资源线程,需要获取锁。如果我们可以将n个传入线程打包到一个请求中,那么资源使用效率将提高n倍。另外,我们需要尽快回答个人的要求。在C语言中,最好的方法/模式是什么?

    目前我有这样的东西:

    //batches lock
    var ilock = ModifyBatch.GetTableDeleteBatchLock(table_info.Name);
    lock (ilock)
    {
        // put the request into requests batch
        if (!ModifyBatch._delete_batch.ContainsKey(table_info.Name))
        {
            ModifyBatch._delete_batch[table_info.Name] = new DeleteData() { Callbacks = new List<Action<string>>(), ids = ids };
        }
        else
        {
            ModifyBatch._delete_batch[table_info.Name].ids.UnionWith(ids);
        }
        //this callback will get called once the job is done by a thread that will acquire resource lock
        ModifyBatch._delete_batch[table_info.Name].Callbacks.Add(f =>
        {
            done = true;
            error = f;
        });
    }
    
    bool lockAcquired = false;
    int maxWaitMs = 60000;
    DeleteData _delete_data = null;
    
    //resource lock
    var _write_lock = GetTableWriteLock(typeof(T).Name);
    try
    {
        DateTime start = DateTime.Now;
        while (!done)
        {
            lockAcquired = Monitor.TryEnter(_write_lock, 100);
            if (lockAcquired)
            {
                if (done) //some other thread did our job
                                {
                    Monitor.Exit(_write_lock);
                    lockAcquired = false;
                    break;
                }
                else
                {
                    break;
                }
            }
            Thread.Sleep(100);
            if ((DateTime.Now - start).TotalMilliseconds > maxWaitMs)
            {
                throw new Exception("Waited too long to acquire write lock?");
            }
        }
        if (done) //some other thread did our job
        {
            if (!string.IsNullOrEmpty(error))
            {
                throw new Exception(error);
            }
            else
            {
                return;
            }
        }
    
        //not done, but have write lock for the table
        lock (ilock)
        {
            _delete_data = ModifyBatch._delete_batch[table_info.Name];
            var oval = new DeleteData();
            ModifyBatch._delete_batch.TryRemove(table_info.Name, out oval);
        }
        if (_delete_data.ids.Any())
        {
            //doing the work with resource 
        }
        foreach (var cb in _delete_data.Callbacks)
        {
            cb(null);
        }
    }
    catch (Exception ex)
    {
        if (_delete_data != null)
        {
            foreach (var cb in _delete_data.Callbacks)
            {
                cb(ex.Message);
            }
        }
        throw;
    }
    finally
    {
        if (lockAcquired)
        {
            Monitor.Exit(_write_lock);
        }
    }
    
    1 回复  |  直到 6 年前
        1
  •  2
  •   rustyx    6 年前

    如果可以在当前请求的范围之外处理任务,即将其排队等待稍后处理,那么可以考虑这样的序列 :

    实现资源 (监视器)和 List 任务。

    1. 对于每个请求:

    2. 锁定列表,将当前任务添加到列表,记住列表中的任务数,解锁列表。

    3. 尝试获得 .

    4. 如果不成功:

      • 如果列表中的任务数为阈值X,则返回。
      • 否则获取锁(将阻塞)
    5. 锁定列表,将其内容移动到临时列表,解锁列表。

    6. 如果临时列表不为空

      • 执行临时列表中的任务。

      • 重复步骤5。

    7. 释放 .

    第一个请求将贯穿整个过程。如果第一个请求仍在执行,后续请求将在步骤4短路。

    调优最佳阈值x(或将其更改为基于时间的阈值)。


    如果你需要的话 等待 对于请求范围内的任务,您需要稍微扩展流程:

    向任务类添加两个字段: 完成标志 例外 .

    在第4步,返回之前, 等待 完成任务( Monitor.Wait )直到它 完成标志 变成 true . 如果 例外 不是 null 扔出去。

    在步骤6,为每个任务设置 完成标志 以及可选的 例外 然后 通知 服务员( Monitor.PulseAll )