首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Axon框架的超低延迟进程

使用Axon框架的超低延迟进程
EN

Stack Overflow用户
提问于 2021-09-02 12:36:34
回答 1查看 434关注 0票数 2

因此,我正在为一个使用axon和Spring框架的低延迟交易引擎开发一个PoC。对于单个进程流,是否有可能实现低至10-50ms的延迟?这一过程将包括验证、订单和风险管理。我已经在一个简单的应用程序上做了一些初步的测试,以更新订单状态并执行它,并且我正在300ms+中以延迟的方式计时。这让我好奇我能用Axon优化多少?

编辑:

延迟问题与Axon无关。使用InMemoryEventStorageEngineDisruptorCommandBus设法将其降低到每个进程流5ms。

信息的流动是这样的。NewOrderCommand(published (客户端) -> OrderCreated(published (聚合) -> ExecuteOrder(published ( saga) -> OrderExecutionRequested -> ConfirmOrderExecution(published ( saga) -> OrderExecuted(published (聚合)

编辑2:最终切换到Axon,但正如预期的那样,平均延迟上升到150 as。使用Docker安装了Axon。如何使用AxonServer优化应用程序以实现亚毫秒延迟前进?任何指点都会受到赞赏。

编辑3:@Steven,根据您的建议,我已经设法将延迟降低到平均10 is,这是一个很好的开始!然而,是否有可能把它进一步降低呢?由于我现在测试的只是一系列流程中的一个小流程,比如验证、风险管理和位置跟踪,然后才能最终执行订单输出。所有这些都应该在5ms或更短的时间内完成。更糟的情况是容忍10毫秒(这是更新的时间预算)。另外,请注意在下面的吐露,新的读数是基于一个InMemorySagaStore支持的WeakReferenceCache。真的很感谢你的帮助!

OrderAggregate:

代码语言:javascript
复制
@Aggregate
internal class OrderAggregate {
    @AggregateIdentifier(routingKey = "orderId")
    private lateinit var clientOrderId: String
    private var orderId: String = UUID.randomUUID().toString()
    private lateinit var state: OrderState
    private lateinit var createdAtSource: LocalTime

    private val log by Logger()

    constructor() {}

    @CommandHandler
    constructor(command: NewOrderCommand) {
        log.info("received new order command")
        val (orderId, created) = command
        apply(
                OrderCreatedEvent(
                        clientOrderId = orderId,
                        created = created
                )
        )
    }

    @CommandHandler
    fun handle(command: ConfirmOrderExecutionCommand) {
        apply(OrderExecutedEvent(orderId = command.orderId, accountId = accountId))
    }

    @CommandHandler
    fun execute(command: ExecuteOrderCommand) {
        log.info("execute order event received")
        apply(
                OrderExecutionRequestedEvent(
                        clientOrderId = clientOrderId
                )
        )
    }

    @EventSourcingHandler
    fun on(event: OrderCreatedEvent) {
        log.info("order created event received")
        clientOrderId = event.clientOrderId
        createdAtSource = event.created
        setState(Confirmed)
    }

    @EventSourcingHandler
    fun on(event: OrderExecutedEvent) {
        val now = LocalTime.now()
        log.info(
                "elapse to execute: ${
                    createdAtSource.until(
                            now,
                            MILLIS
                    )
                }ms. created at source: $createdAtSource, now: $now"
        )
        setState(Executed)
    }

    private fun setState(state: OrderState) {
        this.state = state
    }
}

OrderManagerSaga:

代码语言:javascript
复制
@Profile("rabbit-executor")
@Saga(sagaStore = "sagaStore")
class OrderManagerSaga {
    @Autowired
    private lateinit var commandGateway: CommandGateway

    @Autowired
    private lateinit var executor: RabbitMarketOrderExecutor
    private val log by Logger()

    @StartSaga
    @SagaEventHandler(associationProperty = "clientOrderId")
    fun on(event: OrderCreatedEvent) {
        log.info("saga received order created event")
        commandGateway.send<Any>(ExecuteOrderCommand(orderId = event.clientOrderId, accountId = event.accountId))
    }

    @SagaEventHandler(associationProperty = "clientOrderId")
    fun on(event: OrderExecutionRequestedEvent) {
        log.info("saga received order execution requested event")
        try {
            //execute order
            commandGateway.send<Any>(ConfirmOrderExecutionCommand(orderId = event.clientOrderId))
        } catch (e: Exception) {
            log.error("failed to send order: $e")
            commandGateway.send<Any>(
                    RejectOrderCommand(
                            orderId = event.clientOrderId
                    )
            )
        }
    }
}

豆类:

代码语言:javascript
复制
@Bean
fun eventSerializer(mapper: ObjectMapper): JacksonSerializer{
    return JacksonSerializer.Builder()
            .objectMapper(mapper)
            .build()
}

@Bean
fun commandBusCache(): Cache {
    return WeakReferenceCache()
}

@Bean
fun sagaCache(): Cache {
    return WeakReferenceCache()
}

   
@Bean
fun associationsCache(): Cache {       
    return WeakReferenceCache()
}

@Bean
fun sagaStore(sagaCache: Cache, associationsCache: Cache): CachingSagaStore<Any>{    
    val sagaStore = InMemorySagaStore()
    return CachingSagaStore.Builder<Any>()
            .delegateSagaStore(sagaStore)
            .associationsCache(associationsCache)
            .sagaCache(sagaCache)
            .build()
}

@Bean
fun commandBus(
        commandBusCache: Cache,
        orderAggregateFactory: SpringPrototypeAggregateFactory<Order>,
        eventStore: EventStore,
        txManager: TransactionManager,
        axonConfiguration: AxonConfiguration,
        snapshotter: SpringAggregateSnapshotter
): DisruptorCommandBus {  
    val commandBus = DisruptorCommandBus.builder()
            .waitStrategy(BusySpinWaitStrategy())
            .executor(Executors.newFixedThreadPool(8))
            .publisherThreadCount(1)
            .invokerThreadCount(1)
            .transactionManager(txManager)
            .cache(commandBusCache)
            .messageMonitor(axonConfiguration.messageMonitor(DisruptorCommandBus::class.java, "commandBus"))
            .build()
    commandBus.registerHandlerInterceptor(CorrelationDataInterceptor(axonConfiguration.correlationDataProviders()))
    return commandBus
}

Application.yml:

代码语言:javascript
复制
axon:
  server:
    enabled: true
  eventhandling:
    processors:
      name:
        mode: tracking
        source: eventBus
  serializer:
    general : jackson
    events : jackson
    messages : jackson
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-09-07 13:15:58

原始响应

您的设置描述是彻底的,但我认为仍然有一些选项,我可以推荐。这涉及到框架内的许多位置,所以如果对Axon中的位置或目标的建议有什么不清楚的地方,请随意添加一个评论,以便我可以更新我的回应。

现在,让我们列出我想要做的事情:

  • 如果加载需要很长时间,则为聚合设置快照。使用AggregateLoadTimeSnapshotTriggerDefinition可配置。
  • 为聚合引入缓存。我会从尝试WeakReferenceCache开始。如果这还不够,就值得研究EhCache和JCache适配器。或者建造你自己的。顺便提一下,关于聚合缓存的部分是下面是
  • 为你的传奇引入了一个缓存。我会从尝试WeakReferenceCache开始。如果这还不够,就值得研究EhCache和JCache适配器。或者建造你自己的。顺便提一下,下面是是关于佐贺缓存的一节。
  • 你真的需要一个传奇在这个设置?这个过程看起来很简单,可以在常规的事件处理组件中运行。如果是这样的话,,而不是,在Saga流中移动可能也会带来一个加速。
  • 你试过优化DisruptorCommandBus吗?尝试使用WaitStrategy、publisher线程计数、调用线程计数和使用的Executor
  • 尝试PooledStreamingEventProcessor (简称PSEP)而不是TrackingEventProcessor (简称TEP)。前者提供了更多的配置选项。顺便说一下,与TEP相比,默认值已经提供了更高的吞吐量。增加“批次大小”允许您一次摄入更多的事件。您还可以更改PSEP用于事件检索工作(由协调器完成)和事件处理(工作人员执行器负责此操作)使用的Executor
  • 您还可以在Axon Server上配置一些可以提高吞吐量的东西。试试event.events-per-segment-prefetchevent.read-buffer-sizecommand-thread。可能还有其他可行的选项,因此值得查看整个选项列表( 这里 )。
  • 虽然很难推断这是否会立即产生好处,但您可以为Axon提供更多的内存/ CPU。至少2Gb堆和4个核心。玩这些数字可能也有帮助。

可能还有更多的东西要分享,但这些都是我心中最重要的东西。希望这对你有所帮助,大卫!

第二次反应

为了进一步推断出我们可以在哪里获得更高的性能,我认为必须知道您的应用程序正在处理的哪个进程所花费的时间最长。这将使我们能够推断出,如果我们能够改进,应该改进什么。

你有没有试过做一个线程转储来推断哪个部分占用的时间最多?如果您可以分享这一点,作为对您的问题的更新,我们可以开始考虑以下步骤。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69030439

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档