我不认为我完全理解您的用例,但考虑到您的问题带有
java
mongo-asyc-driver
这一要求当然是可以实现的:
从集合中创建元素流以将其传递给测试。。。使其无缝工作,无阻塞
以下代码:
-
使用MongoDB RxJava驱动程序查询集合
-
创建接收
Observable
-
对此表示赞同
-
记录例外
-
标记完成
CountDownLatch latch = new CountDownLatch(1);
List<Throwable> failures = new ArrayList<>();
collection.find()
.toObservable().subscribe(
// on next, this is invoked for each document returned by your find call
document -> {
// presumably you'll want to do something here to meet this requirement: "pass it on to test in JUnit5"
System.out.println(document);
},
/// on error
throwable -> {
failures.add(throwable);
},
// on completion
() -> {
latch.countDown();
});
// await the completion event
latch.await();
笔记:
-
这需要使用
MongoDB RxJava driver
(即
com.mongodb.rx.client
命名空间。。。这个
org.mongodb::mongodb-driver-rx
-
在你的问题中,你正在援引
collection.find().batchSize()
这清楚地表明你不是
使用Rx驱动器(自
batchSize
不能是Rx友好概念:)
-
以上代码通过MongoDB RxJava驱动程序的v1.4.0和的v1.1.10进行了验证
io.reactive::rxjava
更新1
? 我认为你正在从可观察的流中弹出一个列表
只有在可观察物耗尽之后
doTest()
. 该方法涉及(1)来自MongoDB的流结果;(2) 将这些结果存储在内存中;(3) 正在运行
doTest()
对于每个存储的结果。如果你真的想一直流,那么你应该打电话
从您的observable订阅中。例如:
mydatabase.getCollection("testcollection").find()
.toObservable().subscribe(
document -> {
doTest(process(document));
},
throwable -> {
failures.add(throwable);
},
() -> {
latch.countDown();
});
latch.await();
doTest()
当它从MongoDB接收到每个文档时,当整个可观察对象耗尽时,锁存将减少,您的代码将完成。