首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >NEventStore乐观锁

NEventStore乐观锁
EN

Stack Overflow用户
提问于 2013-08-25 15:52:46
回答 1查看 2K关注 0票数 3

我是NEventStore和事件采购的新手。在一个项目中,我想使用NEventStore来持久化聚合生成的事件,但是在正确处理并发性方面有一些问题。

如何使用乐观锁写入同一流?

假设我有两个来自两个不同线程的相同聚合的实例,这些实例在版本1中加载。然后,第一线程调用命令A和第二线程调用命令B。使用乐观锁,如果出现并发异常,聚合之一将失败。

我想使用maxRevision从加载聚合的角度打开流,但似乎CommitChanges从未失败,如果我传递了一个旧的修订版。

我错过了什么?当使用NEventStore/Event时,乐观锁是否可能/正确?

下面是我用来重现问题的代码:

代码语言:javascript
复制
namespace NEventStore.Example
{
    using System;
    using System.Transactions;
    using NEventStore;
    using NEventStore.Dispatcher;
    using NEventStore.Persistence.SqlPersistence.SqlDialects;

    internal static class MainProgram
    {
        private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
        private static IStoreEvents store;

        private static void Main()
        {
            using (var scope = new TransactionScope())
            using (store = WireupEventStore())
            {
                Client1(revision: 0);

                Client2(revision: 0);

                scope.Complete();
            }

            Console.WriteLine(Resources.PressAnyKey);
            Console.ReadKey();
        }

        private static IStoreEvents WireupEventStore()
        {
             return Wireup.Init()
                .UsingInMemoryPersistence()
                .Build();
        }

        private static void Client1(int revision)
        {
            using (var stream = store.OpenStream(StreamId, 0, revision))
            {
                var @event = new SomeDomainEvent { Value = "Client 1 - event 1." };

                stream.Add(new EventMessage { Body = @event });


                stream.CommitChanges(Guid.NewGuid());
            }
        }

        private static void Client2(int revision)
        {
            using (var stream = store.OpenStream(StreamId, 0, revision))
            {
                var @event = new SomeDomainEvent { Value = "Client 2 - event 1." };

                stream.Add(new EventMessage { Body = @event });


                stream.CommitChanges(Guid.NewGuid());
            }
        }
    }
}

我希望client 2失败,因为我用旧版本打开了流。

更新26/08/2013:我已经使用server测试了相同的代码,并且似乎像预期的那样工作。

代码语言:javascript
复制
namespace NEventStore.Example
{
    using System;
    using System.Transactions;
    using NEventStore;
    using NEventStore.Dispatcher;
    using NEventStore.Persistence.SqlPersistence.SqlDialects;

    internal static class MainProgram
    {
        private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
        private static IStoreEvents store;

        private static void Main()
        {
            using (store = WireupEventStore())
            {
                OpenOrCreateStream();

                AppendToStream_Client1(revision: 1);

                AppendToStream_Client2(revision: 1); // throws an error
                // AppendToStream_Client2(revision: 2); // works
            }

            Console.WriteLine(Resources.PressAnyKey);
            Console.ReadKey();
        }

        private static IStoreEvents WireupEventStore()
        {
             return Wireup.Init()
                .LogToOutputWindow()
                .UsingInMemoryPersistence()
                .UsingSqlPersistence("EventStore") // Connection string is in app.config
                    .WithDialect(new MsSqlDialect())
                    .InitializeStorageEngine()
                    .UsingJsonSerialization()
                .Build();
        }

        private static void OpenOrCreateStream()
        {
            using (var stream = store.OpenStream(StreamId, 0, int.MaxValue))
            {
                var @event = new SomeDomainEvent { Value = "Initial event." };

                stream.Add(new EventMessage { Body = @event });
                stream.CommitChanges(Guid.NewGuid());
            }
        }

        private static void AppendToStream_Client1(int revision)
        {
            using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
            {
                var @event = new SomeDomainEvent { Value = "Second event 1." };

                stream.Add(new EventMessage { Body = @event });
                stream.CommitChanges(Guid.NewGuid());
            }
        }

        private static void AppendToStream_Client2(int revision)
        {
            using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
            {
                var @event = new SomeDomainEvent { Value = "Second event 2." };

                stream.Add(new EventMessage { Body = @event });
                stream.CommitChanges(Guid.NewGuid());
            }
        }
    }
}

因此,回到我的问题:要启用乐观锁,应该在打开流时使用修订吗?还有其他可能的实现或指导方针吗?

谢谢

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2013-08-25 16:29:22

首先,内存中的持久性实现(其主要目的是测试)不是事务感知的.在最初的示例中,客户机2将简单地将它的事件附加到流中。尝试使用支持事务的持久性存储(SQL & Raven,但不支持Mongo)运行上面的内容。

其次,在打开流时指定min/max修订用于不同的目的:

  1. 当重新补充聚合,并且没有可用的快照时,您将指定(min:0,max:int.MaxValue),因为您对检索所有事件感兴趣。
  2. 当重新补充聚合和快照可用时,可以指定(min:snapshot.Version、max:int.MaxValue)来获取自快照以来发生的所有事件。
  3. 在保存聚合时,可以指定(min:0,max:Aggregate.Version)。Aggregate.Version是在再水化过程中产生的.如果相同的聚合在其他地方同时被重新水化并保存,您将有一个竞争状态,并且会出现一个ConcurrencyException

对其中大部分内容的支持将封装在域框架中。参见AggregateBaseEventStoreRepository中的CommonDomain

第三,也是最重要的,在单个事务中更新>1流是一种代码嗅探。如果您正在执行DDD/ES,则流表示单个聚合根,根据定义,它是一个一致性边界。在事务中创建/更新多个AR会破坏这一点。NEventStore的事务支持是(勉强的)添加的,这样它就可以与其他工具一起工作,即从MSMQ/NServiceBus/任何事务读取命令并处理它,或者通过事务方式向队列发送提交消息并将其标记为这样。就我个人而言,我建议你尽量避免2PC。

票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/18430840

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档