首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Quarkus BackPressure配置

Quarkus BackPressure配置
EN

Stack Overflow用户
提问于 2020-08-14 00:32:21
回答 1查看 438关注 0票数 0

我使用quarkus smallrye reactive messaging和kafka获得了以下堆栈跟踪:

代码语言:javascript
复制
2020-07-24 01:38:31,662 ERROR [io.sma.rea.mes.kafka] (executor-thread-870) SRMSG18207: Unable to dispatch message to Kafka: io.smallrye.mutiny.subscription.BackPressureFailure: Could not emit tick 211 due to lack of requests
        at io.smallrye.mutiny.operators.multi.builders.IntervalMulti$IntervalRunnable.run(IntervalMulti.java:83)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
        at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:2046)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1578)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1452)
        at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
        at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
        at java.lang.Thread.run(Thread.java:748)
        at org.jboss.threads.JBossThread.run(JBossThread.java:479)

因此,我已经阅读了https://smallrye.io/smallrye-mutiny/#_how_do_i_control_the_back_pressure

根据文档,我添加了BackPressure控件。

之前:

代码语言:javascript
复制
@Outgoing( "eqs-crossing-xxx" )
    public Multi< EQSAlert > eqsCrossingXXX_XXX(){

        final String series = CrossingEnum.XXX_XXX.getSeries();
        final String equipment = CrossingEnum.XXX_XXX.getEquipment();

        final String vehicleRegex = vehicleRegexService.getRegexBySeries( series );

        log.info( "Incoming request for {} - {}",  series, equipment);
        log.info( "Vehicle regex : {}", vehicleRegex );

        return Multi
                .createFrom()
                .ticks()
                .every(
                        Duration.ofSeconds( poolingInterval )
                )
                .concatMap(i -> {
                    final Multi<CrossingState> crossingStateBySeriesAndEquipment = CrossingState.getCrossingStateBySeriesAndEquipment(client, series, equipment);

                    return crossingStateBySeriesAndEquipment.flatMap(crossingState ->
                            crossingState.isActive() ?
                                    EQSAlert.getEQSAlertBySeriesAndEquipment(
                                            client,
                                            series,
                                            vehicleRegex,
                                            equipment
                                    )
                                    :
                                    Multi.createFrom().empty()
                    );
                });
    }

之后:

代码语言:javascript
复制
@Outgoing( "eqs-crossing-xxx" )
    public Multi< EQSAlert > eqsCrossingXXX_XXX(){

        final String series = CrossingEnum.XXX_XXX.getSeries();
        final String equipment = CrossingEnum.XXX_XXX.getEquipment();

        final String vehicleRegex = vehicleRegexService.getRegexBySeries( series );

        log.info( "Incoming request for {} - {}",  series, equipment);
        log.info( "Vehicle regex : {}", vehicleRegex );

        return Multi
                .createFrom()
                .ticks()
                .every(
                        Duration.ofSeconds( poolingInterval )
                )
                .onOverflow()
                .buffer(10)
                .concatMap(i -> {
                    final Multi<CrossingState> crossingStateBySeriesAndEquipment = CrossingState.getCrossingStateBySeriesAndEquipment(client, series, equipment);

                    return crossingStateBySeriesAndEquipment.flatMap(crossingState ->
                            crossingState.isActive() ?
                                    EQSAlert.getEQSAlertBySeriesAndEquipment(
                                            client,
                                            series,
                                            vehicleRegex,
                                            equipment
                                    )
                                    :
                                    Multi.createFrom().empty()
                    );
                });
    }

现在一切都好了。

我这篇文章的目的是为了理解为什么我需要这样做?

https://smallrye.io/smallrye-mutiny/apidocs/io/smallrye/mutiny/subscription/BackPressureStrategy.html

为什么缓冲区不能容纳你?

如您所见,我每隔5秒执行一次简单的sql函数调用(poolingInterval)。该函数返回一些记录(通过池化决不会超过10条)

所以交通量非常低

我可以说几句话来理解缓冲区管理吗?

谢谢

EN

回答 1

Stack Overflow用户

发布于 2020-09-18 15:12:08

您的下游每单位时间只能消耗一定数量的项目。这取决于你在做什么。默认情况下,Kafka限制为5个并发写入(您可以配置)。

因此,如果你排放更多,以一种无界的-没有反压力感知-的方式,下游无法跟上。添加10个项目的缓冲区可能会处理小的凹凸,但这可能还不够。

drop更激进;如果下游跟不上,这些项目就会被简单地丢弃。dropPreviousItems正在丢弃已收到的项目。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63399473

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档