我发现您的模型在一致性、状态控制和可伸缩性方面可能有所改进。
实现这一点的一种方法是将项目与您的状态相耦合,将这对项目排队和出列,并创建一种机制来确保状态更改。
我的建议如下图所示:
根据这个模型和您的示例,我们可以:
package stackoverflow;
import java.util.concurrent.LinkedBlockingQueue;
import stackoverflow.item.ItemState;
import stackoverflow.task.CreatingTask;
import stackoverflow.task.FirstMovingTask;
import stackoverflow.task.SecondMovingTask;
public class Main {
private static void startTask(String name, Runnable r){
Thread t = new Thread(r, name);
t.start();
}
public static void main(String[] args) {
//create queues
LinkedBlockingQueue<ItemState> firstQueue = new LinkedBlockingQueue<ItemState>();
LinkedBlockingQueue<ItemState> secondQueue = new LinkedBlockingQueue<ItemState>();
//start three threads
startTask("Thread#1", new CreatingTask(firstQueue));
startTask("Thread#2", new FirstMovingTask(firstQueue, secondQueue));
startTask("Thread#3", new SecondMovingTask(secondQueue));
}
}
每个任务运行操作
op()
根据以下确认
项目状态
:
三个专用线程之一将其移动到第二队列,最后
另一个专用线程将其删除。
ItemState
是一个不可变的对象,它包含
Item
还有你的
State
. 这确保了项和状态值之间的一致性。
itemstate已确认下一个状态正在创建自我控制状态的机制:
public class FirstMovingTask {
//others codes
protected void op() {
try {
//dequeue
ItemState is0 = new ItemState(firstQueue.take());
System.out.println("Item " + is0.getItem().getValue() + ": " + is0.getState().getValue());
//process here
//enqueue
ItemState is1 = new ItemState(is0);
secondQueue.add(is1);
System.out.println("Item " + is1.getItem().getValue() + ": " + is1.getState().getValue());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//others codes
}
使用itemstate实现:
public class ItemStateImpl implements ItemState {
private final Item item;
private final State state;
public ItemStateImpl(Item i){
this.item = i;
this.state = new State();
}
public ItemStateImpl(ItemState is) {
this.item = is.getItem();
this.state = is.getState().next();
}
// gets attrs
}
可伸缩,因为您可以控制更多的状态
next()
推广移动任务以增加队列数量。
结果:
Item 0: AFTER_FIRST
Item 0: IN_FIRST
Item 0: IN_SECOND
Item 0: AFTER_SECOND
Item 1: IN_FIRST
Item 1: AFTER_FIRST
Item 1: IN_SECOND
Item 1: AFTER_SECOND
Item 2: IN_FIRST
Item 2: AFTER_FIRST
Item 2: IN_SECOND
... others
更新(2018年7月6日):分析地图搜索的使用
使用诸如comparator之类的等于值在map中搜索可能不起作用,因为通常值和标识(key/hash)之间的映射不是一对一的(参见下图)。这样就需要为搜索结果为o(n)(最坏情况)的值创建一个排序列表。
具有
Item.getValuesHashCode()
:
private int getValuesHashCode(){
return new HashCodeBuilder().append(value).hashCode();
}
在这种情况下,你必须
Vector<ItemState>
而不是
项目
使用密钥就像
getValuesHashCode
. 改变状态控制机制,使项目的第一个引用和状态保持最新。见下文:
//Main.class
public static void main(String[] args) {
... others code ...
//references repository
ConcurrentHashMap<Integer, Vector<ItemState>> statesMap = new ConcurrentHashMap<Integer, Vector<ItemState>>();
//start three threads
startTask("Thread#1", new CreatingTask(firstQueue, statesMap));
... others code ...
}
//CreateTask.class
protected void op() throws InterruptedException {
//create item
ItemState is = new ItemStateImpl(new Item(i++, NameGenerator.name()));
//put in monitor and enqueue
int key = is.getHashValue();
Vector<ItemState> items = map.get(key);
if (items == null){
items = new Vector<>();
map.put(key, items);
}
items.add(is);
//enqueue
queue.put(is);
}
//FirstMovingTask.class
protected void op() throws InterruptedException{
//dequeue
ItemState is0 = firstQueue.take();
//process
ItemState is1 = process(is0.next());
//enqueue
secondQueue.put(is1.next());
}
//ItemState.class
public ItemState next() {
//required for consistent change state
synchronized (state) {
state = state.next();
return this;
}
}
要搜索,必须使用concurrentmapref.get(key)。结果将引用更新的itemstate。
我的测试结果是:
# key = hash("a")
# concurrentMapRef.get(key)
...
Item#7#0 : a - IN_FIRST
... many others lines
Item#7#0 : a - AFTER_FIRST
Item#12#1 : a - IN_FIRST
... many others lines
Item#7#0 : a - IN_SECOND
Item#12#1 : a - IN_FIRST
... many others lines
Item#7#0 : a - AFTER_SECOND
Item#12#1 : a - IN_FIRST
代码中的更多详细信息:
https://github.com/ag-studies/stackoverflow-queue
2018年9月6日更新:重新设计
概括这个项目,我可以理解状态机是这样的:
通过这种方式,我将队列的工作线程分离,以改进概念。我用了一个备忘录来保存整个流程中项目的唯一引用。
当然,如果需要将itemstate保存在物理存储库中,可以使用基于事件的策略。
这保留了先前的想法,并为概念创建了更清晰的形式。请参阅:
我知道每个作业都有两个队列(输入/输出)以及与业务模型的关系!
研究者总是会发现项目的最新和最一致的状态
.
所以,回答你的问题:
-
我可以找到
项目
在任何地方使用memoryREP(基本上是一个映射),包装状态和项
项目状态
,并控制作业在排队或出列时的更改状态。
-
除了运行next()
-
州政府(为你的问题)一直是一致的
-
在此模型中,可以使用任意队列类型、任意数量的作业/队列和任意数量的状态。
-
另外这很漂亮!!