代码之家  ›  专栏  ›  技术社区  ›  maaartinus

跟踪映射中队列之间的进度

  •  12
  • maaartinus  · 技术社区  · 6 年前

    我目前有两个队列和项目之间的旅行。最初,一个项目被放入 firstQueue ,然后三个专用线程中的一个将其移动到 secondQueue 最后,另一个专用线程将其删除。这些动作显然包括一些处理。我需要能得到任何物品的状态( IN_FIRST , AFTER_FIRST , IN_SECOND , AFTER_SECOND ABSENT )我通过更新 statusMap 队列被修改为

    while (true) {
        Item i = firstQueue.take();
        statusMap.put(i, AFTER_FIRST);
        process(i);
        secondQueue.add(i);
        statusMap.put(i, IN_SECOND);
    }
    

    这是可行的,但它很难看,并且留下了一个状态不一致的时间窗口。这种不一致并不是什么大问题,它可以通过同步来解决,但这可能适得其反,因为队列容量有限,可能会阻塞。丑陋更让我烦恼。

    由于处理过程需要几秒钟,所以效率几乎不重要。使用专用线程来控制并发性。任何项目都不应该处于多个状态(但这不是很重要,也不能由我当前的racy方法保证)。会有更多的队列(和州),它们会有不同的类型( DelayQueue , ArrayBlockingQueue ,也许 PriorityQueue )

    我想知道是否有一个好的解决方案可以推广到多个队列?

    4 回复  |  直到 6 年前
        1
  •  3
  •   DavidW    6 年前

    用管理项目状态的逻辑包装队列有意义吗?

    public class QueueWrapper<E> implements BlockingQueue<E> {
        private Queue<E> myQueue = new LinkedBlockingQueue<>();
        private Map<E, Status> statusMap;
    
        public QueueWrapper(Map<E, Status> statusMap) {
            this.statusMap = statusMap;
        }
    
        [...]
        @Override
        public E take() throws InterruptedException {
            E result = myQueue.take();
            statusMap.put(result, Status.AFTER_FIRST);
            return result;
        }
    

    这样,状态管理总是与队列操作相关(并包含在队列操作中)……

    显然 statusMap 需要同步,但无论如何这都是个问题。

        2
  •  3
  •   Aristofanio Garcia    6 年前

    我发现您的模型在一致性、状态控制和可伸缩性方面可能有所改进。

    实现这一点的一种方法是将项目与您的状态相耦合,将这对项目排队和出列,并创建一种机制来确保状态更改。

    我的建议如下图所示:

    enter image description here

    根据这个模型和您的示例,我们可以:

    package stackoverflow;
    
    import java.util.concurrent.LinkedBlockingQueue;
    
    import stackoverflow.item.ItemState;
    import stackoverflow.task.CreatingTask;
    import stackoverflow.task.FirstMovingTask;
    import stackoverflow.task.SecondMovingTask;
    
    public class Main {
    
        private static void startTask(String name, Runnable r){
            Thread t = new Thread(r, name);
            t.start();
        }
    
        public static void main(String[] args) {
            //create queues
            LinkedBlockingQueue<ItemState> firstQueue = new LinkedBlockingQueue<ItemState>();
            LinkedBlockingQueue<ItemState> secondQueue = new LinkedBlockingQueue<ItemState>();
            //start three threads
            startTask("Thread#1", new CreatingTask(firstQueue));
            startTask("Thread#2", new FirstMovingTask(firstQueue, secondQueue));
            startTask("Thread#3", new SecondMovingTask(secondQueue));
        }
    }
    

    每个任务运行操作 op() 根据以下确认 项目状态 :

    三个专用线程之一将其移动到第二队列,最后 另一个专用线程将其删除。

    ItemState 是一个不可变的对象,它包含 Item 还有你的 State . 这确保了项和状态值之间的一致性。

    itemstate已确认下一个状态正在创建自我控制状态的机制:

    public class FirstMovingTask {
        //others codes
        protected void op() {
                try {
                    //dequeue
                    ItemState is0 = new ItemState(firstQueue.take());
                    System.out.println("Item " + is0.getItem().getValue() + ": " + is0.getState().getValue());
                    //process here
                    //enqueue
                    ItemState is1 = new ItemState(is0);
                    secondQueue.add(is1);
                    System.out.println("Item " + is1.getItem().getValue() + ": " + is1.getState().getValue());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        //others codes
    }
    

    使用itemstate实现:

    public class ItemStateImpl implements ItemState {
        private final Item item;
        private final State state;
    
        public ItemStateImpl(Item i){
            this.item = i;
            this.state = new State();
        }
    
        public ItemStateImpl(ItemState is) {
            this.item = is.getItem();
            this.state = is.getState().next();
        }
    
        // gets attrs
    }
    

    可伸缩,因为您可以控制更多的状态 next() 推广移动任务以增加队列数量。

    结果:

    Item 0: AFTER_FIRST
    Item 0: IN_FIRST
    Item 0: IN_SECOND
    Item 0: AFTER_SECOND
    Item 1: IN_FIRST
    Item 1: AFTER_FIRST
    Item 1: IN_SECOND
    Item 1: AFTER_SECOND
    Item 2: IN_FIRST
    Item 2: AFTER_FIRST
    Item 2: IN_SECOND
    ... others
    

    更新(2018年7月6日):分析地图搜索的使用 使用诸如comparator之类的等于值在map中搜索可能不起作用,因为通常值和标识(key/hash)之间的映射不是一对一的(参见下图)。这样就需要为搜索结果为o(n)(最坏情况)的值创建一个排序列表。

    enter image description here

    具有 Item.getValuesHashCode() :

    private int getValuesHashCode(){
      return new HashCodeBuilder().append(value).hashCode();
    }
    

    在这种情况下,你必须 Vector<ItemState> 而不是 项目 使用密钥就像 getValuesHashCode . 改变状态控制机制,使项目的第一个引用和状态保持最新。见下文:

    //Main.class
    public static void main(String[] args) {
        ... others code ...
    
        //references repository
        ConcurrentHashMap<Integer, Vector<ItemState>> statesMap = new ConcurrentHashMap<Integer, Vector<ItemState>>();
        //start three threads
        startTask("Thread#1", new CreatingTask(firstQueue, statesMap));
    
        ... others code ...
    }
    
    //CreateTask.class
    protected void op() throws InterruptedException {
        //create item
        ItemState is = new ItemStateImpl(new Item(i++, NameGenerator.name()));
        //put in monitor and enqueue
        int key = is.getHashValue();
        Vector<ItemState> items = map.get(key);
        if (items == null){
            items = new Vector<>();
            map.put(key, items);
        }
        items.add(is);
        //enqueue
        queue.put(is);
    }
    
    //FirstMovingTask.class
    protected void op() throws InterruptedException{
        //dequeue
        ItemState is0 = firstQueue.take();
        //process
        ItemState is1 = process(is0.next());
        //enqueue 
        secondQueue.put(is1.next());
    }
    
    //ItemState.class
    public ItemState next() {
        //required for consistent change state
        synchronized (state) {
            state = state.next();
            return this;
        }
    }
    

    要搜索,必须使用concurrentmapref.get(key)。结果将引用更新的itemstate。

    我的测试结果是:

    # key = hash("a")
    # concurrentMapRef.get(key)
    ...
    Item#7#0    : a - IN_FIRST 
    ... many others lines
    Item#7#0    : a - AFTER_FIRST 
    Item#12#1   : a - IN_FIRST 
    ... many others lines
    Item#7#0    : a - IN_SECOND 
    Item#12#1   : a - IN_FIRST 
    ... many others lines
    Item#7#0    : a - AFTER_SECOND 
    Item#12#1   : a - IN_FIRST 
    

    代码中的更多详细信息: https://github.com/ag-studies/stackoverflow-queue

    2018年9月6日更新:重新设计

    概括这个项目,我可以理解状态机是这样的:

    enter image description here

    通过这种方式,我将队列的工作线程分离,以改进概念。我用了一个备忘录来保存整个流程中项目的唯一引用。 当然,如果需要将itemstate保存在物理存储库中,可以使用基于事件的策略。

    这保留了先前的想法,并为概念创建了更清晰的形式。请参阅:

    enter image description here

    我知道每个作业都有两个队列(输入/输出)以及与业务模型的关系! 研究者总是会发现项目的最新和最一致的状态 .

    所以,回答你的问题:

    • 我可以找到 项目 在任何地方使用memoryREP(基本上是一个映射),包装状态和项 项目状态 ,并控制作业在排队或出列时的更改状态。

    • 除了运行next()

    • 州政府(为你的问题)一直是一致的

    • 在此模型中,可以使用任意队列类型、任意数量的作业/队列和任意数量的状态。

    • 另外这很漂亮!!

        3
  •  1
  •   Fabien Leborgne    6 年前

    如前所述,包装队列或项目将是可行的解决方案或两者兼而有之。

    public class ItemWrapper<E> {
       E item;
       Status status;
       public ItemWrapper(Item i, Status s){ ... }
       public setStatus(Status s){ ... }
       // not necessary if you use a queue wrapper (see queue wrapper)
       public boolean equals(Object obj) {
         if ( obj instanceof ItemWrapper)
           return item.equals(((ItemWrapper) obj).item) 
         return false;
       }
       public int hashCode(){
         return item;
       }
    }
    ...
    process(item) // process update status in the item
    ...
    

    可能有一个更好的方法,已经回答过了,就是有一个更新队列状态的queuewrapper。为了好玩,我不使用状态映射,但我使用以前的itemwrapper,它看起来更干净(状态映射也可以工作)。

    public class QueueWrapper<E> implements Queue<E> {
      private Queue<ItemWrapper<E>> myQueue;
      static private Status inStatus; // FIRST
      static private Status outStatus; // AFTER_FIRST
      public QueueWrapper(Queue<E> myQueue, Status inStatus, Status outStatus) {...}
      @Override
      public boolean add(E e) {
        return myQueue.add(new ItemWrapper(e, inStatus));
      }
      @Override
      public E remove(){
        ItemWrapper<E> result = myQueue.remove();
        result.setStatus(outStatus)
        return result.item;
      }
      ...  
      }
    

    您还可以使用aop在队列中注入状态更新,而无需更改队列(状态映射应该比itemwrapper更合适)。

    也许我没有很好地回答你的问题,因为知道你的物品在哪里的一个简单方法是使用“contains”函数签入每个队列。

        4
  •  1
  •   kag0    6 年前

    这里有些不同于其他人所说的话。从队列服务和系统的世界中,我们得到了消息确认的概念。这很好,因为它还提供了一些内置的重试逻辑。

    我将从更高的层次上阐述它的工作原理,如果您需要,我可以添加代码。

    基本上你会有一个 Set 去排队。您将把队列包装在一个对象中,这样当您将一个项目出列时,会发生一些事情

    1. 项目已从队列中移除
    2. 项目将添加到关联集
    3. 已计划任务(lambda包含原子布尔值(默认为false))。运行时,它将从集合中移除项,如果布尔值为false,则将其放回队列中
    4. 项和布尔值周围的包装器将返回给调用方

    一次 process(i); 完成后,代码将向包装器指示接收确认,包装器将从集合中移除项并使布尔值为false。

    返回状态的方法只需检查项目所在的队列或集合。

    请注意,这将提供“至少一次”传递,意味着项目将至少处理一次,但如果处理时间太接近超时,则可能会多次处理。