我正在访问一个REST服务器,它公开了一个带有头Content-Encoding = gzip和Content-Type = application/json的GET端点,因此服务器在发送响应之前压缩数据。
我试图在基于时间的数据的S3上做一种备份,服务器只允许获取1分钟的块。因此,在1天内,我需要发送1440 (一天中的分钟)请求。
此数据(压缩约10 s3,未压缩约每分钟70 s3),我想发送到s3通过多部分上传压缩。
从我的所有测试中,我无法找到阻止Netty解压响应的方法,所有使此反应的努力也都失败了。
我的当事人:
@Client("${url}")
@Header(name = "Accept-encoding", value = "gzip")
public interface MyClient {
@Get(value = "/data-feeds", consumes = MediaType.APPLICATION_OCTET_STREAM)
Flux<byte[]> getData(@QueryValue("from") String from,
@QueryValue("minute") String minute,
@QueryValue("filters") List<Object> filters);我也尝试过许多其他事情,比如:返回类型ByteBuffer或使用ReactorStreamingHttpClient dataStream
为了得到我做过的数据:
Flux.range(0,1439)
.flatMapSequential(minute ->
client.getData('2022-01-01',minute,Collections.emptyList()),20)这是它的第一部分,之后我通过一个GZIPOutputStream将数据映射到一个新的byteArray,我需要bufferUntil它得到5mb块或更多来完成S3多部分上传。
我可以从日志中看到调用是在不同的事件循环线程上进行的,我在Schedulers.boundedElastic()上执行对压缩的Schedulers.boundedElastic()的映射,但是应用程序仍然是线性的。
完成数据2分钟的时间是1的两倍。
我知道这是两个问题,但我认为已经不需要对接收到的数据进行解压缩并再次压缩它将节省我的时间。
发布于 2022-08-15 19:45:27
最后是对第二个问题的答复。
这里发生的情况是:
因此,您需要一个不篡改响应的低级客户端。问题是:你真的想走那条小巷吗?
我想您希望每个响应都有一个压缩文件。那么,你很有可能不得不自己去处理这个问题。micronaut.server.netty.max-chunk-size表示,最大默认值为8192 (文档),并表示在您的情况下有推荐设置的块数。
如果您的JSON是一个数据数组(您没有在您的问题中显示响应),我将选择一个简单的、反应性的客户机(就像您已经做的那样),并一次将数据流到GZIPOutputStream中一个数组项中。
或者,将压缩任务卸载到AWS Lambda,以保持代码整洁和后续人员高兴:)
你的第二个问题:
我可以从日志中看到调用是在不同的事件循环线程上进行的,我在Schedulers.boundedElastic()上执行对压缩的Schedulers.boundedElastic()的映射,但是应用程序仍然是线性的。
在没有完整代码的情况下,我通过在正确的地方引入subscribeOn来说明不同之处。
该平台是一种方法someMethodWithLatency,在使用Mono.fromCallable的2ms之后返回一个Mono<Int>。它在flatMapSequential内部被调用。第二部分是反应链(请参见下面的代码)。
结果
,我一直遵循这些经验法则,
这一主题在许多文章中都有讨论。
我使用了一些来自疑问的代码,请注意flatMapSequential是在没有maxConcurrency的情况下使用的。
请原谅我在下面的代码中使用Kotlin。
import io.kotest.core.spec.style.BehaviorSpec
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
val log = loggerFor<FluxTest>()
/**
* mock method to be used inside flatMapSequential
*/
fun someMethodWithLatency(index: Int): Mono<Int> = Mono.fromCallable {
runBlocking(Dispatchers.IO) {
/** artificial latency of 2 ms */
delay(2)
index
}
}
class FluxTest : BehaviorSpec({
given("some code for testing flatMapSequential") {
val startTime = AtomicLong()
`when`("subscribing to chain") {
Flux.range(0, 1439)
.doOnSubscribe {
startTime.set(System.nanoTime())
}.flatMapSequential {
someMethodWithLatency(it)
/** without subscribeOn: Completed in ~3080 ms */
/** with subscribeOn: Completed in ~170 ms */
.subscribeOn(Schedulers.boundedElastic())
}.doFinally {
log.info(
"Completed in {} ms",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime.get())
)
}.subscribe()
}
}
})https://stackoverflow.com/questions/73359065
复制相似问题