我使用Spring Boot和WebFlux运行基于Netty的Kotlin应用程序。详情如下:
-
Java 11;
-
Kotlin 1.3.61;
-
弹簧靴2.2.5.释放;
-
Spring Vault核心2.2.2.发布。
我在web层上得到一个文件。WebFlux创建了一个
Part
(
org.springframework.http.codec.multipart
)数据存储在Project Reactor的
Flux
在
零件
作为一股洪流
DataBuffer
大小为4Kb的块:
Flux<DataBuffer> content();
由于符合框架的一致性,我对
通量
Kotlin的
Flow
.
然后我使用同步Vault客户端
encrypt(...)
在内部异步提交块(据我所知)
flatMapMerge
方法(注
加密(…)
不是
suspend
并且它是远程加密提供商的HTTP客户端之上的包装器):
public String encrypt(String keyName, String plaintext);
我已经检查了这个答案
https://stackoverflow.com/a/58659423/6612401
并发现基于流的方法应与
flow { emit(...)}
.
我的问题是
我可以使用这种基于流的方法吗
暂停
功能?或者,考虑到我正在使用,是否有更好的方法
runBlocking(Dispatchers.IO)
以及a
暂停
fold(...)
功能。
代码如下:
@FlowPreview
@ExperimentalCoroutinesApi
private fun getOpenByteArrayAndEncryptText(part: Part): Pair<ByteArray, String> = runBlocking(Dispatchers.IO) {
val pair = part.content().asFlow()
.flatMapMerge { dataBuffer ->
val openByteArray = dataBuffer.asInputStream().readBytes()
val opentextBase64 = Base64Utils.encodeToString(openByteArray)
flow { emit(Pair(openByteArray, vaultTransitTemplate.encrypt(KEY_NAME, opentextBase64))) }
}.fold(Pair(ByteArrayOutputStream(), StringBuilder())) { result, curPair ->
result.first.writeBytes(curPair.first)
result.second.append(curPair.second)
result
}
Pair(pair.first.toByteArray(), pair.second.toString())
}
P.S
折叠(…)
函数将打开的块收集到
ByteArrayOutputStream
稍后计算哈希值,并将加密块收集到
StringBuilder
作为加密文件的结果。
P.S.我试过我的方法。该方法在我的Core i5 8gen 4物理内核机器上平均提交5-7个并行请求。它完成了它的工作,但没那么快。如果Vault不是在本地部署的,每1 Mb加密大约需要1秒。我知道这取决于网络的延迟。我甚至没有考虑到保险库一侧的加密速度,由于块的大小仅为4Kb,它的速度非常快。有什么方法可以提高并发速度吗?
P.P.S.我试过玩
concurrency = MAX_CONCURRENT_REQUESTS
在里面
flatMapMerge{...}
到目前为止,结果并不显著。最好还是让它默认。