代码之家  ›  专栏  ›  技术社区  ›  Eugene Mamaev

使用Kotlin Flow、Coroutines和NOT suspend函数的并行IO请求

  •  0
  • Eugene Mamaev  · 技术社区  · 4 年前

    我使用Spring Boot和WebFlux运行基于Netty的Kotlin应用程序。详情如下:

    • Java 11;
    • Kotlin 1.3.61;
    • 弹簧靴2.2.5.释放;
    • Spring Vault核心2.2.2.发布。

    我在web层上得到一个文件。WebFlux创建了一个 Part ( org.springframework.http.codec.multipart )数据存储在Project Reactor的 Flux 零件 作为一股洪流 DataBuffer 大小为4Kb的块:

    Flux<DataBuffer> content();
    

    由于符合框架的一致性,我对 通量 Kotlin的 Flow .

    然后我使用同步Vault客户端 encrypt(...) 在内部异步提交块(据我所知) flatMapMerge 方法(注 加密(…) 不是 suspend 并且它是远程加密提供商的HTTP客户端之上的包装器):

    public String encrypt(String keyName, String plaintext);
    

    我已经检查了这个答案 https://stackoverflow.com/a/58659423/6612401 并发现基于流的方法应与 flow { emit(...)} .

    我的问题是 我可以使用这种基于流的方法吗 暂停 功能?或者,考虑到我正在使用,是否有更好的方法 runBlocking(Dispatchers.IO) 以及a 暂停 fold(...) 功能。

    代码如下:

    @FlowPreview
    @ExperimentalCoroutinesApi
    private fun getOpenByteArrayAndEncryptText(part: Part): Pair<ByteArray, String> = runBlocking(Dispatchers.IO) {
        val pair = part.content().asFlow()
                .flatMapMerge { dataBuffer ->
                    val openByteArray = dataBuffer.asInputStream().readBytes()
                    val opentextBase64 = Base64Utils.encodeToString(openByteArray)
                    flow { emit(Pair(openByteArray,  vaultTransitTemplate.encrypt(KEY_NAME, opentextBase64))) }
                }.fold(Pair(ByteArrayOutputStream(), StringBuilder())) { result, curPair ->
                    result.first.writeBytes(curPair.first)
                    result.second.append(curPair.second)
                    result
                }
        Pair(pair.first.toByteArray(), pair.second.toString())
    }
    

    P.S 折叠(…) 函数将打开的块收集到 ByteArrayOutputStream 稍后计算哈希值,并将加密块收集到 StringBuilder 作为加密文件的结果。

    P.S.我试过我的方法。该方法在我的Core i5 8gen 4物理内核机器上平均提交5-7个并行请求。它完成了它的工作,但没那么快。如果Vault不是在本地部署的,每1 Mb加密大约需要1秒。我知道这取决于网络的延迟。我甚至没有考虑到保险库一侧的加密速度,由于块的大小仅为4Kb,它的速度非常快。有什么方法可以提高并发速度吗?

    P.P.S.我试过玩 concurrency = MAX_CONCURRENT_REQUESTS 在里面 flatMapMerge{...} 到目前为止,结果并不显著。最好还是让它默认。

    0 回复  |  直到 4 年前