代码之家  ›  专栏  ›  技术社区  ›  user3044440

从集合创建元素流

  •  0
  • user3044440  · 技术社区  · 7 年前

    然而,有了这段代码,我只能运行1000条记录。如何使其无缝无阻塞地工作。

        MongoCollection<Document> collection = mydatabase.getCollection("mycoll");
        final List<Document> cache = Collections.synchronizedList(new ArrayList<Document>());
    
        FindIterable<Document> f = collection.find().batchSize(1000);
        f.batchCursor(new SingleResultCallback<AsyncBatchCursor<Document>>() {
    
            @Override
            public void onResult(AsyncBatchCursor<Document> t, Throwable thrwbl) {
                t.next(new SingleResultCallback<List<Document>>() {
    
                    @Override
                    public void onResult(List<Document> t, Throwable thrwbl) {
                        if (thrwbl != null) {
                            th.set(thrwbl);
                        }
                        cache.addAll(t);
                        latch.countDown();;
    
                    }
                });
            }
        });
        latch.await();
        return cache.stream().map(batch->process(batch));
    

    更新的代码

    @ParameterizedTest
    @MethodSource("setUp")
    void cacheTest(MyClazz myclass) throws Exception {
        assertTrue(doTest(myclass));
    }
    public static MongoClient getMongoClient() {
     // get client here
    }
    
    private static Stream<MyClazz> setUp() throws Exception {
        MongoDatabase mydatabase = getMongoClient().getDatabase("test");
        List<Throwable> failures = new ArrayList<>();
        CountDownLatch latch = new CountDownLatch(1);
        List<MyClazz> list = Collections.synchronizedList(new ArrayList<>());
                mydatabase.getCollection("testcollection").find()
                .toObservable().subscribe(
                document -> {
                    list.add(process(document));
                },
                throwable -> {
                    failures.add(throwable);
                },
                () -> {
                    latch.countDown();
                });
        latch.await();
        return list.stream();
    }
    
    public boolean doTest(MyClazz myclass) { 
    // processing goes here
    }
    public MyClazz process(Document doc) { 
    // doc gets converted to MyClazz
       return MyClazz;
    }
    

    即使是现在,我看到所有的数据都被加载,然后进行单元测试。

    我的用例是:我在mongo中有一百万条记录,并且正在用它们运行某种集成测试用例。将它们全部加载到内存中是不可行的,因此我正在尝试流媒体解决方案。

    1 回复  |  直到 7 年前
        1
  •  0
  •   glytching    7 年前

    我不认为我完全理解您的用例,但考虑到您的问题带有 java mongo-asyc-driver 这一要求当然是可以实现的:

    从集合中创建元素流以将其传递给测试。。。使其无缝工作,无阻塞

    以下代码:

    • 使用MongoDB RxJava驱动程序查询集合
    • 创建接收 Observable
    • 对此表示赞同
    • 记录例外
    • 标记完成

      CountDownLatch latch = new CountDownLatch(1);
      List<Throwable> failures = new ArrayList<>();
      collection.find()
              .toObservable().subscribe(
              // on next, this is invoked for each document returned by your find call
              document -> {
                  // presumably you'll want to do something here to meet this requirement: "pass it on to test in JUnit5" 
                  System.out.println(document);
              },
              /// on error
              throwable -> {
                  failures.add(throwable);
              },
              // on completion
              () -> {
                  latch.countDown();
              });
      // await the completion event
      latch.await(); 
      

    笔记:

    • 这需要使用 MongoDB RxJava driver (即 com.mongodb.rx.client 命名空间。。。这个 org.mongodb::mongodb-driver-rx
    • 在你的问题中,你正在援引 collection.find().batchSize() 这清楚地表明你不是 使用Rx驱动器(自 batchSize 不能是Rx友好概念:)
    • 以上代码通过MongoDB RxJava驱动程序的v1.4.0和的v1.1.10进行了验证 io.reactive::rxjava

    更新1 ? 我认为你正在从可观察的流中弹出一个列表 只有在可观察物耗尽之后 doTest() . 该方法涉及(1)来自MongoDB的流结果;(2) 将这些结果存储在内存中;(3) 正在运行 doTest() 对于每个存储的结果。如果你真的想一直流,那么你应该打电话 从您的observable订阅中。例如:

    mydatabase.getCollection("testcollection").find()
            .toObservable().subscribe(
            document -> {
                doTest(process(document));
            },
            throwable -> {
                failures.add(throwable);
            },
            () -> {
                latch.countDown();
            });
    latch.await();
    

    doTest() 当它从MongoDB接收到每个文档时,当整个可观察对象耗尽时,锁存将减少,您的代码将完成。