代码之家  ›  专栏  ›  技术社区  ›  Chris

具有反应式Mongo和Web客户端的非阻塞功能方法

  •  0
  • Chris  · 技术社区  · 6 年前

    我有一个微服务,它使用 ReactiveMongoRepository 接口。

    目标是获取这些对象中的每一个,并将其推送到一个AWS lambda函数(在将其转换为DTO之后)。如果lambda函数的结果在200范围内,则将对象标记为成功,否则忽略。

    在一个简单的MongoRepository和一个restTemplate的旧时代,这将是一个微不足道的任务。然而,我试图理解这种被动的交易,并避免阻塞。

    这是我编出来的代码,我知道我在阻止 webClient 但我该如何避免呢?

    @Override
    public Flux<Video> index() {
        return videoRepository.findAllByIndexedIsFalse().flatMap(video -> {
            final SearchDTO searchDTO = SearchDTO.builder()
                    .name(video.getName())
                    .canonicalPath(video.getCanonicalPath())
                    .objectID(video.getObjectID())
                    .userId(video.getUserId())
                    .build();
    
            // Blocking call
            final HttpStatus httpStatus = webClient.post()
                    .uri(URI.create(LAMBDA_ENDPOINT))
                    .body(BodyInserters.fromObject(searchDTO)).exchange()
                    .block()
                    .statusCode();
    
            if (httpStatus.is2xxSuccessful()) {
                video.setIndexed(true);
            }
    
            return videoRepository.save(video);
        });
    }
    

    我从一个预定的任务调用上面的内容,我并不真正关心index()方法的实际结果,只关心在这个过程中会发生什么。

    @Scheduled(fixedDelay = 60000)
    public void indexTask() {
        indexService
                .index()
                .log()
                .subscribe();
    }
    

    我读过很多关于这个主题的博客文章,但它们都是简单的CRUD操作,中间没有任何事情发生,所以不要给我一个如何实现这些东西的完整画面。

    有什么帮助吗?

    2 回复  |  直到 6 年前
        1
  •  1
  •   Brian Clozel    6 年前

    您的解决方案实际上非常接近。 在这些情况下,您应该尝试逐步分解反应链,并毫不犹豫地将位转换为独立的方法,以获得清晰的效果。

    @Override
    public Flux<Video> index() {
    
        Flux<Video> unindexedVideos = videoRepository.findAllByIndexedIsFalse();
        return unindexedVideos.flatMap(video -> {
            final SearchDTO searchDTO = SearchDTO.builder()
                    .name(video.getName())
                    .canonicalPath(video.getCanonicalPath())
                    .objectID(video.getObjectID())
                    .userId(video.getUserId())
                    .build();
    
            Mono<ClientResponse> indexedResponse = webClient.post()
                .uri(URI.create(LAMBDA_ENDPOINT))
                .body(BodyInserters.fromObject(searchDTO)).exchange()
                .filter(res -> res.statusCode().is2xxSuccessful());
    
            return indexedResponse.flatMap(response -> {
                video.setIndexed(true);
                return videoRepository.save(video);
            });
        });
    
        2
  •  0
  •   piotr szybicki    6 年前

    我的方法,也许更易读一点。但我承认我没有运行它,所以不能100%保证它会工作。

    public Flux<Video> index() {
        return videoRepository.findAll()
            .flatMap(this::callLambda)
            .flatMap(videoRepository::save);
    }
    
    private Mono<Video> callLambda(final Video video) {
        SearchDTO searchDTO = new SearchDTO(video);
        return webClient.post()
                .uri(URI.create(LAMBDA_ENDPOINT))
                .body(BodyInserters.fromObject(searchDTO))
                .exchange()
                .map(ClientResponse::statusCode)
                .filter(HttpStatus::is2xxSuccessful)
                .map(t -> {
                    video.setIndexed(true);
                    return video;
                });
    }