首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Micronaut将数据http响应压缩

Micronaut将数据http响应压缩
EN

Stack Overflow用户
提问于 2022-08-15 09:24:37
回答 1查看 159关注 0票数 1

我正在访问一个REST服务器,它公开了一个带有头Content-Encoding = gzipContent-Type = application/json的GET端点,因此服务器在发送响应之前压缩数据。

我试图在基于时间的数据的S3上做一种备份,服务器只允许获取1分钟的块。因此,在1天内,我需要发送1440 (一天中的分钟)请求。

此数据(压缩约10 s3,未压缩约每分钟70 s3),我想发送到s3通过多部分上传压缩。

从我的所有测试中,我无法找到阻止Netty解压响应的方法,所有使此反应的努力也都失败了。

我的当事人:

代码语言:javascript
复制
@Client("${url}")
@Header(name = "Accept-encoding", value = "gzip")
public interface MyClient {

  @Get(value = "/data-feeds", consumes = MediaType.APPLICATION_OCTET_STREAM)
  Flux<byte[]> getData(@QueryValue("from") String from,
                           @QueryValue("minute") String minute,
                           @QueryValue("filters") List<Object> filters);

我也尝试过许多其他事情,比如:返回类型ByteBuffer或使用ReactorStreamingHttpClient dataStream

为了得到我做过的数据:

代码语言:javascript
复制
Flux.range(0,1439)
     .flatMapSequential(minute -> 
        client.getData('2022-01-01',minute,Collections.emptyList()),20)

这是它的第一部分,之后我通过一个GZIPOutputStream将数据映射到一个新的byteArray,我需要bufferUntil它得到5mb块或更多来完成S3多部分上传。

我可以从日志中看到调用是在不同的事件循环线程上进行的,我在Schedulers.boundedElastic()上执行对压缩的Schedulers.boundedElastic()的映射,但是应用程序仍然是线性的。

完成数据2分钟的时间是1的两倍。

我知道这是两个问题,但我认为已经不需要对接收到的数据进行解压缩并再次压缩它将节省我的时间。

EN

回答 1

Stack Overflow用户

发布于 2022-08-15 19:45:27

最后是对第二个问题的答复。

这里发生的情况是:

  • 你说你的客户正在接受gzip编码,
  • 端点正在返回gzip编码的响应。
  • 您的客户端正在将响应中的所有块组装回可读的内容。

因此,您需要一个不篡改响应的低级客户端。问题是:你真的想走那条小巷吗?

我想您希望每个响应都有一个压缩文件。那么,你很有可能不得不自己去处理这个问题。micronaut.server.netty.max-chunk-size表示,最大默认值为8192 (文档),并表示在您的情况下有推荐设置的块数。

如果您的JSON是一个数据数组(您没有在您的问题中显示响应),我将选择一个简单的、反应性的客户机(就像您已经做的那样),并一次将数据流到GZIPOutputStream中一个数组项中。

或者,将压缩任务卸载到AWS Lambda,以保持代码整洁和后续人员高兴:)

你的第二个问题:

我可以从日志中看到调用是在不同的事件循环线程上进行的,我在Schedulers.boundedElastic()上执行对压缩的Schedulers.boundedElastic()的映射,但是应用程序仍然是线性的。

在没有完整代码的情况下,我通过在正确的地方引入subscribeOn来说明不同之处。

该平台是一种方法someMethodWithLatency,在使用Mono.fromCallable的2ms之后返回一个Mono<Int>。它在flatMapSequential内部被调用。第二部分是反应链(请参见下面的代码)。

结果

  • 没有subscribeOn的运行时间:~3080 ms
  • 使用subscribeOn的运行时间:~170 ms

,我一直遵循这些经验法则,

  • 慢发布服务器+快速订阅服务器:使用subscribeOn
  • 快速发布服务器+慢订阅服务器:使用publishOn

这一主题在许多文章中都有讨论。

我使用了一些来自疑问的代码,请注意flatMapSequential是在没有maxConcurrency的情况下使用的。

请原谅我在下面的代码中使用Kotlin。

代码语言:javascript
复制
import io.kotest.core.spec.style.BehaviorSpec
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong

val log = loggerFor<FluxTest>()

/**
 * mock method to be used inside flatMapSequential
 */
fun someMethodWithLatency(index: Int): Mono<Int> = Mono.fromCallable {
    runBlocking(Dispatchers.IO) {
        /** artificial latency of 2 ms */
        delay(2)
        index
    }
}

class FluxTest : BehaviorSpec({
    given("some code for testing flatMapSequential") {
        val startTime = AtomicLong()

        `when`("subscribing to chain") {

            Flux.range(0, 1439)
                .doOnSubscribe {
                    startTime.set(System.nanoTime())
                }.flatMapSequential {
                    someMethodWithLatency(it)
                        /** without subscribeOn: Completed in ~3080 ms */
                        /** with subscribeOn: Completed in ~170 ms */
                        .subscribeOn(Schedulers.boundedElastic())
                }.doFinally {
                    log.info(
                        "Completed in {} ms",
                        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime.get())
                    )
                }.subscribe()
        }
    }
})
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73359065

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档