代码之家  ›  专栏  ›  技术社区  ›  davidvera

Webflux post请求:只允许一个连接接收订阅者

  •  0
  • davidvera  · 技术社区  · 3 年前

    我尝试实现一个springboot应用程序,它允许我:

    • 在S3上上传mp3
    • 提取元数据并将其存储在mongo DB上。

    如果我想从mp3中提取元数据,我必须提供MultipartFile。不幸的是,我们无法发送通量。

    在我的控制器中,我确实传递了3个参数:

    @PostMapping
    public Mono<ResponseEntity<AudioDto>> saveAudioTrack(@RequestHeader HttpHeaders headers,
                                                         @RequestHeader String fileName,
                                                         @RequestBody Flux<ByteBuffer> body) {
        return this.audioService.saveAudioTrack(headers, fileName, body);
    }
    

    现在我有点困在我的服务层。我想构建我的响应,存储数据并将文件发布到S3上。为了存储元数据,mp3agic。

    public Mono<ResponseEntity<AudioDto>> saveAudioTrack(HttpHeaders headers, String fileName, Flux<ByteBuffer> body) {
        return Mono.from(body.flatMap(byteBuffer -> {
            Mono<MultipartFile> multipartFileMono = AppUtils.byteBufferToMultipartFile(headers, fileName, body);
            return multipartFileMono;
        }).flatMap(multipartFile -> {
            AudioEntity storedEntity = new AudioEntity();
            try {
                AudioDto audioDto = storeAudioMeta(multipartFile, headers);
                AudioEntity audioEntity = new AudioEntity();
                BeanUtils.copyProperties(audioDto, audioEntity);
                storedEntity = audioRepository.save(audioEntity).block();
            } catch (IOException | InvalidDataException | UnsupportedTagException e) {
                e.printStackTrace();
            }
            AudioDto storedAudio = new AudioDto();
            BeanUtils.copyProperties(storedEntity, storedAudio);
            ResponseEntity<AudioDto> responseEntity = ResponseEntity.status(HttpStatus.OK).body(storedAudio);
            return Mono.just(responseEntity);
        }));
    }
    

    我首先尝试将ByteBuffer转换为MultipartFile:

    public static Mono<MultipartFile> byteBufferToMultipartFile(HttpHeaders headers, String fileName, Flux<ByteBuffer> body) {
        System.out.println(headers);
        return Mono.from(body.flatMap(byteBuffer -> {
            byte[] bytes = byteBuffer.array();
            MultipartFile multipartFile = new MockMultipartFile(fileName, bytes);
            return Mono.just(multipartFile);
        }));
    }
    

    然后我调用了应该允许我持久化元数据的代码的一部分

    private AudioDto storeAudioMeta(MultipartFile multipartFile, HttpHeaders headers) throws IOException, InvalidDataException, UnsupportedTagException {
        AudioDto audioDto = new AudioDto();
        if (multipartFile.isEmpty()) {
            throw new IllegalStateException("Cannot upload empty file");
        }
        //Check if the file is an image => we'll check if it's a mp3
        if(!multipartFile.getContentType().equals("audio/mpeg")) {
            throw new IllegalStateException("File uploaded is not a mp3");
        }
        InputStream initialStream = multipartFile.getInputStream();
        byte[] buffer = new byte[initialStream.available()];
        initialStream.read(buffer);
        File targetFile = new File("src/main/resources/" + multipartFile.getOriginalFilename());
        try (OutputStream outStream = new FileOutputStream(targetFile)) {
            outStream.write(buffer);
        }
    
        Mp3File mp3file  = new Mp3File(targetFile.getPath());
        audioDto.setLength((int) mp3file.getLengthInSeconds());
        audioDto.setBitrate(mp3file.getBitrate());
        audioDto.setSampleRate(mp3file.getSampleRate());
        if (mp3file.hasId3v1Tag()) {
            ID3v1 id3v1Tag = mp3file.getId3v1Tag();
            audioDto.setTrack(id3v1Tag.getTrack());
            audioDto.setArtist(id3v1Tag.getArtist());
            audioDto.setTitle(id3v1Tag.getTitle());
            ...
        }
        if (mp3file.hasId3v2Tag()) {
            ID3v2 id3v2Tag = mp3file.getId3v2Tag();
            audioDto.setTrack(id3v2Tag.getTrack());
            ...
            audioDto.setAlbumImage(id3v2Tag.getAlbumImage());
            byte[] albumImageData = id3v2Tag.getAlbumImage();
            if (albumImageData != null) {
                audioDto.setAlbumImageSize(albumImageData.length);
                audioDto.setAlbumImageMimeType(id3v2Tag.getAlbumImageMimeType());
            }
        }
    
        //get file metadata
        Map<String, String> metadata = new HashMap<>();
        metadata.put("Content-Type", audioDto.getFile().getContentType());
        metadata.put("Content-Length", String.valueOf(audioDto.getFile().getSize()));
        String bucketName = environment.getProperty("amazon.aws.s3.audioBucket");
        String path = String.format("%s/%s", bucketName, UUID.randomUUID());
        String fileName = String.format("%s", audioDto.getFile().getOriginalFilename());
        // try {
        //     fileStore.upload(path, fileName, Optional.of(metadata), 
        audioDto.getFile().getInputStream());
        // } catch (IOException e) {
            throw new IllegalStateException("Failed to upload file", e);
        //}
        audioDto.setFileName(fileName);
        audioDto.setFilePath(path);
        return audioDto;
    }
    

    我返回了错误消息:

    java.lang.IllegalStateException: Only one connection receive subscriber allowed.
        at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:182) ~[reactor-netty-core-1.0.12.jar:1.0.12]
        Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
    Error has been observed at the following site(s):
        *__checkpoint ⇢ Handler com.myapp.api.mediastreaming.ui.controller.AudioController#saveAudioTrack(HttpHeaders, String, Flux) [DispatcherHandler]
        *__checkpoint ⇢ HTTP POST "/audio/tracks" [ExceptionHandlingWebHandler]
    Stack trace:
            at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:182) ~[reactor-netty-core-1.0.12.jar:1.0.12]
            at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:143) ~[reactor-netty-core-1.0.12.jar:1.0.12]
            at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.4.11.jar:3.4.11]
            at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:339) ~[reactor-netty-core-1.0.12.jar:1.0.12]
            at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.4.11.jar:3.4.11]
            at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:339) ~[reactor-netty-core-1.0.12.jar:1.0.12]
            at reactor.core.publisher.Mono.subscribe(Mono.java:4399) ~[reactor-core-3.4.11.jar:3.4.11]
            at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:426) ~[reactor-core-3.4.11.jar:3.4.11]
            at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) ~[reactor-core-3.4.11.jar:3.4.11]
            at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.11.jar:3.4.11]
            at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.11.jar:3.4.11]
            at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.11.jar:3.4.11]
            at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200) ~[reactor-core-3.4.11.jar:3.4.11]
            at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.11.jar:3.4.11]
            at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279) ~[reactor-netty-core-1.0.12.jar:1.0.12]
            at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388) ~[reactor-netty-core-1.0.12.jar:1.0.12]
            at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404) ~[reactor-netty-core-1.0.12.jar:1.0.12]
            at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:584) ~[reactor-netty-http-1.0.12.jar:1.0.12]
            at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) ~[reactor-netty-core-1.0.12.jar:1.0.12]
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:261) ~[reactor-netty-http-1.0.12.jar:1.0.12]
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.69.Final.jar:4.1.69.Final]
            at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.69.Final.jar:4.1.69.Final]
            at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
    
    0 回复  |  直到 3 年前
        1
  •  0
  •   davidvera    3 年前

    受到本教程的启发,我终于找到了一个解决方案: https://www.vinsguru.com/spring-webflux-file-upload/

    在我的控制器中:

    @PostMapping(consumes = MediaType.MULTIPART_FORM_DATA_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
    public ResponseEntity<Mono<AudioDto>> saveAudioTrack(@RequestPart("file") Mono<FilePart> filePartMono) {
        return new ResponseEntity<>(this.audioService.saveAudioTrack(filePartMono), HttpStatus.CREATED);
    }
    

    在我的服务层:

    @Override
    public Mono<AudioDto> saveAudioTrack(Mono<FilePart> filePartMono) {
        Path basePath = Paths.get("./src/main/resources/upload/");
        return filePartMono
                .doOnNext(fp -> System.out.println("Received File : " + fp.filename()))
                .flatMap(fp -> fp.transferTo(basePath.resolve(fp.filename())))
                .then(
                        filePartMono
                                .flatMap(fp -> Mono.just(new File(String.valueOf(basePath.resolve(fp.filename())))))
                                .map(file -> {
                                    try {
                                        return getAudioMetadata(file);
                                    } catch (InvalidDataException | UnsupportedTagException | IOException e) {
                                        e.printStackTrace();
                                        throw new RuntimeException(e.getLocalizedMessage());
                                    }
                                })
                );
    
    }
    

    我还设法获取元数据并将文件上传到S3 bucket:

    我调用我的方法来创建元数据

    private AudioDto getAudioMetadata(File file) throws InvalidDataException, UnsupportedTagException, IOException {
        Mp3File mp3file  = new Mp3File(file.getPath());
        AudioDto audioDto = new AudioDto();
       ...
        if (mp3file.hasId3v1Tag()) {
            ID3v1 id3v1Tag = mp3file.getId3v1Tag();
            audioDto.setTrack(id3v1Tag.getTrack());
            ...   
        }
        if (mp3file.hasId3v2Tag()) {
            ID3v2 id3v2Tag = mp3file.getId3v2Tag();
            audioDto.setTrack(id3v2Tag.getTrack());
            ...
            audioDto.setAlbumImage(id3v2Tag.getAlbumImage());
            byte[] albumImageData = id3v2Tag.getAlbumImage();
            if (albumImageData != null) {
                audioDto.setAlbumImageSize(albumImageData.length);
                audioDto.setAlbumImageMimeType(id3v2Tag.getAlbumImageMimeType());
            }
        }
    
        //get file metadata
        Map<String, String> metadata = new HashMap<>();
        Tika tika = new Tika();
    
        metadata.put("Content-Type", tika.detect(file));
        metadata.put("Content-Length", String.valueOf(file.length()));
        String path = String.format("%s/%s", environment.getProperty("amazon.aws.s3.audioBucket"), UUID.randomUUID());
        //String fileName = String.format("%s", audioDto.getFile().getOriginalFilename());
        try {
            InputStream inputSteam = new FileInputStream(file);
            fileStore.upload(path, file.getName(), Optional.of(metadata), inputSteam);
        } catch (IOException e) {
            throw new IllegalStateException("Failed to upload file", e);
        }
        audioDto.setFileName(file.getName());
        ...
        ModelMapper modelMapper = new ModelMapper();
        modelMapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
        AudioEntity audioFileEntity = modelMapper.map(audioDto, AudioEntity.class);
        AudioEntity storedEntity = audioRepository.save(audioFileEntity);
    
        return modelMapper.map(storedEntity, AudioDto.class);
    }
    

    以下是我的上传方法:

    private final AmazonS3 amazonS3;
    
    public void upload(String path,
                       String fileName,
                       Optional<Map<String, String>> optionalMetaData,
                       InputStream inputStream) {
        ObjectMetadata objectMetadata = new ObjectMetadata();
        optionalMetaData.ifPresent(map -> {
            if (!map.isEmpty()) {
                map.forEach(objectMetadata::addUserMetadata);
            }
        });
        try {
            amazonS3.putObject(path, fileName, inputStream, objectMetadata);
        } catch (AmazonServiceException e) {
            throw new IllegalStateException("Failed to upload the file", e);
        }
    }
    

    不是最优雅的方式,而是功能性的。在告知其完全可操作之前,需要进行一些训练,但没有什么需要技巧的地方。:)

    推荐文章