因此,我正在为一个使用axon和Spring框架的低延迟交易引擎开发一个PoC。对于单个进程流,是否有可能实现低至10-50ms的延迟?这一过程将包括验证、订单和风险管理。我已经在一个简单的应用程序上做了一些初步的测试,以更新订单状态并执行它,并且我正在300ms+中以延迟的方式计时。这让我好奇我能用Axon优化多少?
编辑:
延迟问题与Axon无关。使用InMemoryEventStorageEngine和DisruptorCommandBus设法将其降低到每个进程流5ms。
信息的流动是这样的。NewOrderCommand(published (客户端) -> OrderCreated(published (聚合) -> ExecuteOrder(published ( saga) -> OrderExecutionRequested -> ConfirmOrderExecution(published ( saga) -> OrderExecuted(published (聚合)
编辑2:最终切换到Axon,但正如预期的那样,平均延迟上升到150 as。使用Docker安装了Axon。如何使用AxonServer优化应用程序以实现亚毫秒延迟前进?任何指点都会受到赞赏。
编辑3:@Steven,根据您的建议,我已经设法将延迟降低到平均10 is,这是一个很好的开始!然而,是否有可能把它进一步降低呢?由于我现在测试的只是一系列流程中的一个小流程,比如验证、风险管理和位置跟踪,然后才能最终执行订单输出。所有这些都应该在5ms或更短的时间内完成。更糟的情况是容忍10毫秒(这是更新的时间预算)。另外,请注意在下面的吐露,新的读数是基于一个InMemorySagaStore支持的WeakReferenceCache。真的很感谢你的帮助!
OrderAggregate:
@Aggregate
internal class OrderAggregate {
@AggregateIdentifier(routingKey = "orderId")
private lateinit var clientOrderId: String
private var orderId: String = UUID.randomUUID().toString()
private lateinit var state: OrderState
private lateinit var createdAtSource: LocalTime
private val log by Logger()
constructor() {}
@CommandHandler
constructor(command: NewOrderCommand) {
log.info("received new order command")
val (orderId, created) = command
apply(
OrderCreatedEvent(
clientOrderId = orderId,
created = created
)
)
}
@CommandHandler
fun handle(command: ConfirmOrderExecutionCommand) {
apply(OrderExecutedEvent(orderId = command.orderId, accountId = accountId))
}
@CommandHandler
fun execute(command: ExecuteOrderCommand) {
log.info("execute order event received")
apply(
OrderExecutionRequestedEvent(
clientOrderId = clientOrderId
)
)
}
@EventSourcingHandler
fun on(event: OrderCreatedEvent) {
log.info("order created event received")
clientOrderId = event.clientOrderId
createdAtSource = event.created
setState(Confirmed)
}
@EventSourcingHandler
fun on(event: OrderExecutedEvent) {
val now = LocalTime.now()
log.info(
"elapse to execute: ${
createdAtSource.until(
now,
MILLIS
)
}ms. created at source: $createdAtSource, now: $now"
)
setState(Executed)
}
private fun setState(state: OrderState) {
this.state = state
}
}OrderManagerSaga:
@Profile("rabbit-executor")
@Saga(sagaStore = "sagaStore")
class OrderManagerSaga {
@Autowired
private lateinit var commandGateway: CommandGateway
@Autowired
private lateinit var executor: RabbitMarketOrderExecutor
private val log by Logger()
@StartSaga
@SagaEventHandler(associationProperty = "clientOrderId")
fun on(event: OrderCreatedEvent) {
log.info("saga received order created event")
commandGateway.send<Any>(ExecuteOrderCommand(orderId = event.clientOrderId, accountId = event.accountId))
}
@SagaEventHandler(associationProperty = "clientOrderId")
fun on(event: OrderExecutionRequestedEvent) {
log.info("saga received order execution requested event")
try {
//execute order
commandGateway.send<Any>(ConfirmOrderExecutionCommand(orderId = event.clientOrderId))
} catch (e: Exception) {
log.error("failed to send order: $e")
commandGateway.send<Any>(
RejectOrderCommand(
orderId = event.clientOrderId
)
)
}
}
}豆类:
@Bean
fun eventSerializer(mapper: ObjectMapper): JacksonSerializer{
return JacksonSerializer.Builder()
.objectMapper(mapper)
.build()
}
@Bean
fun commandBusCache(): Cache {
return WeakReferenceCache()
}
@Bean
fun sagaCache(): Cache {
return WeakReferenceCache()
}
@Bean
fun associationsCache(): Cache {
return WeakReferenceCache()
}
@Bean
fun sagaStore(sagaCache: Cache, associationsCache: Cache): CachingSagaStore<Any>{
val sagaStore = InMemorySagaStore()
return CachingSagaStore.Builder<Any>()
.delegateSagaStore(sagaStore)
.associationsCache(associationsCache)
.sagaCache(sagaCache)
.build()
}
@Bean
fun commandBus(
commandBusCache: Cache,
orderAggregateFactory: SpringPrototypeAggregateFactory<Order>,
eventStore: EventStore,
txManager: TransactionManager,
axonConfiguration: AxonConfiguration,
snapshotter: SpringAggregateSnapshotter
): DisruptorCommandBus {
val commandBus = DisruptorCommandBus.builder()
.waitStrategy(BusySpinWaitStrategy())
.executor(Executors.newFixedThreadPool(8))
.publisherThreadCount(1)
.invokerThreadCount(1)
.transactionManager(txManager)
.cache(commandBusCache)
.messageMonitor(axonConfiguration.messageMonitor(DisruptorCommandBus::class.java, "commandBus"))
.build()
commandBus.registerHandlerInterceptor(CorrelationDataInterceptor(axonConfiguration.correlationDataProviders()))
return commandBus
}Application.yml:
axon:
server:
enabled: true
eventhandling:
processors:
name:
mode: tracking
source: eventBus
serializer:
general : jackson
events : jackson
messages : jackson发布于 2021-09-07 13:15:58
原始响应
您的设置描述是彻底的,但我认为仍然有一些选项,我可以推荐。这涉及到框架内的许多位置,所以如果对Axon中的位置或目标的建议有什么不清楚的地方,请随意添加一个评论,以便我可以更新我的回应。
现在,让我们列出我想要做的事情:
AggregateLoadTimeSnapshotTriggerDefinition可配置。WeakReferenceCache开始。如果这还不够,就值得研究EhCache和JCache适配器。或者建造你自己的。顺便提一下,关于聚合缓存的部分是下面是。WeakReferenceCache开始。如果这还不够,就值得研究EhCache和JCache适配器。或者建造你自己的。顺便提一下,下面是是关于佐贺缓存的一节。DisruptorCommandBus吗?尝试使用WaitStrategy、publisher线程计数、调用线程计数和使用的Executor。PooledStreamingEventProcessor (简称PSEP)而不是TrackingEventProcessor (简称TEP)。前者提供了更多的配置选项。顺便说一下,与TEP相比,默认值已经提供了更高的吞吐量。增加“批次大小”允许您一次摄入更多的事件。您还可以更改PSEP用于事件检索工作(由协调器完成)和事件处理(工作人员执行器负责此操作)使用的Executor。event.events-per-segment-prefetch,event.read-buffer-size或command-thread。可能还有其他可行的选项,因此值得查看整个选项列表( 这里 )。可能还有更多的东西要分享,但这些都是我心中最重要的东西。希望这对你有所帮助,大卫!
第二次反应
为了进一步推断出我们可以在哪里获得更高的性能,我认为必须知道您的应用程序正在处理的哪个进程所花费的时间最长。这将使我们能够推断出,如果我们能够改进,应该改进什么。
你有没有试过做一个线程转储来推断哪个部分占用的时间最多?如果您可以分享这一点,作为对您的问题的更新,我们可以开始考虑以下步骤。
https://stackoverflow.com/questions/69030439
复制相似问题