首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用Axon框架获取所有聚合?

如何使用Axon框架获取所有聚合?
EN

Stack Overflow用户
提问于 2017-11-13 07:06:19
回答 2查看 3.2K关注 0票数 3

我从Axon框架开始,遇到了一些障碍。

虽然我可以使用ID加载单个聚合,但我不知道如何获得所有聚合的列表或所有聚合ID的列表。

EventSourcingRepository类只有返回一个聚合的load()方法。

是否有一种方法可以获得所有聚合( Is ),还是应该在axon之外保存所有聚合Is的列表?

为了保持简单,我现在只使用InMemoryEventStorageEngine。我用的是Axon 3.0.7。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-11-13 08:31:32

首先,我想知道为什么要从Repository检索所有聚合的完整列表。Repository接口的设置使您可以加载一个Aggregate来处理命令或创建一个新的Aggregate

在问到您的问题时,我几乎可以猜到您是将其用于查询目的,而不是命令处理。然而,这并不是EventSourcingRepository的预期用途。

我可以考虑的一个原因是,您希望实现一个API调用,以便将命令发布到应用程序中特定类型的所有Aggregates。考虑到这个场景,是的,您需要自己存储aggregateId引用。

但以我前面的问题结束:为什么要通过Repository接口检索聚合列表?

应答更新

关于你的评论,我在回答中补充了以下几点:

Axon帮助您在设置应用程序时考虑到事件来源,但也帮助您设置CQRS (命令查询责任隔离)。因此,这意味着您的应用程序的命令和查询端被拆开了。

聚合Repository是应用程序的命令端,您在其中请求执行操作。因此,它不提供聚合列表,因为命令是对(聚合)的意图的表示。因此,它只需要Repository用户检索一个聚合或创建一个聚合。

您需要使用Aggregates列表的示例是应用程序的查询端。查询端(您的视图/实体)通常是基于事件(通过事件来源)更新的。对于应用程序中的任何查询需求,通常都会引入一个单独的视图,以满足您的需要。

在您的示例中,这意味着您将引入一个事件处理组件,侦听聚合事件,使用聚合的查询模型更新Repository。

票数 3
EN

Stack Overflow用户

发布于 2019-11-28 00:51:03

传递到EventSourcingRepository中的EventSourcingRepository实现了StreamableMessageSource<M extends Message<?>>,这是一种获取聚合的方法。

虽然使用事件处理组件的框架方式可能会更好地扩展(取决于其使用/上下文的方式),但我确信事件处理组件无论如何都是由StreamableMessageSource<M extends Message<?>>驱动的。因此,如果我们想跳过框架,直接进入,我们可以这样做:

代码语言:javascript
复制
    List<String> aggregates(StreamableMessageSource<Message<?>> eventStore) {
        return immediatelyAvailableStream(eventStore.openStream(
                eventStore.createTailToken() /* All events in the event store */
        ))
                .filter(e -> e instanceof DomainEventMessage)
                .map(e -> (DomainEventMessage) e)
                .map(DomainEventMessage::getAggregateIdentifier)
                .distinct()
                .collect(Collectors.toList());
    }

    /*
        Note that the stream returned by BlockingStream.asStream() will block / won't terminate
        as it waits for future elements.
     */
    static <M> Stream<M> immediatelyAvailableStream(final BlockingStream<M> messageStream) {
        Iterator<M> iterator = new Iterator<M>() {
            @Override
            public boolean hasNext() {
                return messageStream.hasNextAvailable();
            }

            @Override
            public M next() {
                try {
                    return messageStream.nextAvailable();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("Didn't expect to be interrupted");
                }
            }
        };

        Spliterator<M> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
        Stream stream = StreamSupport.stream(spliterator, false);
        return (Stream)stream.onClose(messageStream::close);
    }
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47258755

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档