我从Axon框架开始,遇到了一些障碍。
虽然我可以使用ID加载单个聚合,但我不知道如何获得所有聚合的列表或所有聚合ID的列表。
EventSourcingRepository类只有返回一个聚合的load()方法。
是否有一种方法可以获得所有聚合( Is ),还是应该在axon之外保存所有聚合Is的列表?
为了保持简单,我现在只使用InMemoryEventStorageEngine。我用的是Axon 3.0.7。
发布于 2017-11-13 08:31:32
首先,我想知道为什么要从Repository检索所有聚合的完整列表。Repository接口的设置使您可以加载一个Aggregate来处理命令或创建一个新的Aggregate。
在问到您的问题时,我几乎可以猜到您是将其用于查询目的,而不是命令处理。然而,这并不是EventSourcingRepository的预期用途。
我可以考虑的一个原因是,您希望实现一个API调用,以便将命令发布到应用程序中特定类型的所有Aggregates。考虑到这个场景,是的,您需要自己存储aggregateId引用。
但以我前面的问题结束:为什么要通过Repository接口检索聚合列表?
应答更新
关于你的评论,我在回答中补充了以下几点:
Axon帮助您在设置应用程序时考虑到事件来源,但也帮助您设置CQRS (命令查询责任隔离)。因此,这意味着您的应用程序的命令和查询端被拆开了。
聚合Repository是应用程序的命令端,您在其中请求执行操作。因此,它不提供聚合列表,因为命令是对(聚合)的意图的表示。因此,它只需要Repository用户检索一个聚合或创建一个聚合。
您需要使用Aggregates列表的示例是应用程序的查询端。查询端(您的视图/实体)通常是基于事件(通过事件来源)更新的。对于应用程序中的任何查询需求,通常都会引入一个单独的视图,以满足您的需要。
在您的示例中,这意味着您将引入一个事件处理组件,侦听聚合事件,使用聚合的查询模型更新Repository。
发布于 2019-11-28 00:51:03
传递到EventSourcingRepository中的EventSourcingRepository实现了StreamableMessageSource<M extends Message<?>>,这是一种获取聚合的方法。
虽然使用事件处理组件的框架方式可能会更好地扩展(取决于其使用/上下文的方式),但我确信事件处理组件无论如何都是由StreamableMessageSource<M extends Message<?>>驱动的。因此,如果我们想跳过框架,直接进入,我们可以这样做:
List<String> aggregates(StreamableMessageSource<Message<?>> eventStore) {
return immediatelyAvailableStream(eventStore.openStream(
eventStore.createTailToken() /* All events in the event store */
))
.filter(e -> e instanceof DomainEventMessage)
.map(e -> (DomainEventMessage) e)
.map(DomainEventMessage::getAggregateIdentifier)
.distinct()
.collect(Collectors.toList());
}
/*
Note that the stream returned by BlockingStream.asStream() will block / won't terminate
as it waits for future elements.
*/
static <M> Stream<M> immediatelyAvailableStream(final BlockingStream<M> messageStream) {
Iterator<M> iterator = new Iterator<M>() {
@Override
public boolean hasNext() {
return messageStream.hasNextAvailable();
}
@Override
public M next() {
try {
return messageStream.nextAvailable();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Didn't expect to be interrupted");
}
}
};
Spliterator<M> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
Stream stream = StreamSupport.stream(spliterator, false);
return (Stream)stream.onClose(messageStream::close);
}https://stackoverflow.com/questions/47258755
复制相似问题