代码之家  ›  专栏  ›  技术社区  ›  kj007 XdebugX

springwebflux“只允许一个连接接收订户”,如果switchiffempty返回服务器响应

  •  0
  • kj007 XdebugX  · 技术社区  · 6 年前

    我想把一个情况下,如果对象存在,然后发送错误,如果没有,然后创建新的用户。

    public Mono<ServerResponse> createUser(ServerRequest request) {
            Mono<UserBO> userBOMono = request.bodyToMono(UserBO.class);
            Mono<String> email = userBOMono.map(UserBO::getEmail);
            Mono<User> userMono = email.flatMap(userRepository::findByEmail);
            return userMono.flatMap(user -> {
                Mono<ErrorResponse> errorResponseMono = errorHanlder.handleEmailAlreadyExist();
                return ServerResponse.status(HttpStatus.CONFLICT)
                        .contentType(MediaType.APPLICATION_JSON)
                        .body(errorResponseMono, ErrorResponse.class);
            }).switchIfEmpty(Mono.defer(() -> {
                Mono<User> newUserMono = userBOMono.flatMap(userMapping::mapUserBOToUser);
                Mono<User> dbUserMono = newUserMono.flatMap(userRepository::save);
                return ServerResponse.status(HttpStatus.CREATED)
                        .contentType(MediaType.APPLICATION_JSON)
                        .body(dbUserMono, User.class);
    
            }));
    

    如果Mono不是空的,那么它的返回与我想要的冲突,如果是空的,那么创建新的,但是它抛出下面的错误:

    java.lang.IllegalStateException: Only one connection receive subscriber allowed.
        at reactor.ipc.netty.channel.FluxReceive.startReceiver(FluxReceive.java:276) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE]
        at reactor.ipc.netty.channel.FluxReceive.lambda$subscribe$2(FluxReceive.java:127) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE]
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:163) ~[netty-common-4.1.27.Final.jar:4.1.27.Final]
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java) ~[netty-common-4.1.27.Final.jar:4.1.27.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.27.Final.jar:4.1.27.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:464) ~[netty-transport-4.1.27.Final.jar:4.1.27.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) ~[netty-common-4.1.27.Final.jar:4.1.27.Final]
        at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]
    

    更新说明:其正确行为符合方法定义:

    switchIfEmpty(Mono<? extends T> alternate)
    Fallback to an alternative Mono if this mono is completed without data
    

    意味着当我把空的单核细胞送入体内时,它的工作正常:

    return ServerResponse.status(HttpStatus.CREATED)
                        .contentType(MediaType.APPLICATION_JSON)
                        .body(Mono.empty(), User.class);
    

    所以,如果我想发送Mono对象作为返回,那么处理swtichIfEmpty情况的解决方案是什么呢。

    2 回复  |  直到 6 年前
        1
  •  2
  •   kj007 XdebugX    6 年前

    最后我解决了这个问题,我读了两次userBOMono流,这导致webflux抛出这个错误。

     public Mono<ServerResponse> createUser(ServerRequest request) {
            Mono<UserBO> userBOMono = request.bodyToMono(UserBO.class);
            return userBOMono.flatMap(userBO -> {
                String email = userBO.getEmail();
                Mono<User> userMono = userRepository.findByEmail(email);
                return userMono.flatMap(user -> errorHandler.handleEmailAlreadyExist())
                        .switchIfEmpty(Mono.defer(() -> createNewUser(userBO)));
            });
        }
    
        private Mono<ServerResponse> createNewUser(UserBO userBO) {
            Mono<User> userMono = Mono.just(userBO).flatMap(userMapping::mapUserBOToUser).flatMap(userRepository::save);
            return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
                    .body(userMono, User.class);
    
        }
    
        2
  •  0
  •   cs95 abhishek58g    5 年前

    我假设您使用WebClient来调用这个API。