当您使用Session-Per-Request模式时,您在使用NHibernate的3层应用程序中使用什么模式/体系结构,该应用程序需要支持事务失败时的重试?(因为ISession在异常之后变得无效,即使这是死锁、超时或活锁异常)。
发布于 2011-02-10 20:30:55
Note2现在,我永远不会将写事务放在web项目中-而是使用消息+队列,并让一个工作者在后台处理旨在完成事务工作的消息。
然而,我仍然会使用事务读取来获得一致的数据;再加上MVCC/Snapshot隔离,从web项目中。在这种情况下,您会发现每个事务的每个请求的会话是非常好的。
备注1这篇文章的想法已经放在了Castle Transactions framework和我的新NHibernate Facility中。
好了,大意是这样的。假设您想要为一个客户创建一个未最终确定的订单。你有某种类型的GUI,例如浏览器/MVC应用程序,它用相关信息创建一个新的数据结构(或者你从网络上获得这个数据结构):
[Serializable]
class CreateOrder /*: IMessage*/
{
// immutable
private readonly string _CustomerName;
private readonly decimal _Total;
private readonly Guid _CustomerId;
public CreateOrder(string customerName, decimal total, Guid customerId)
{
_CustomerName = customerName;
_Total = total;
_CustomerId = customerId;
}
// put ProtoBuf attribute
public string CustomerName
{
get { return _CustomerName; }
}
// put ProtoBuf attribute
public decimal Total
{
get { return _Total; }
}
// put ProtoBuf attribute
public Guid CustomerId
{
get { return _CustomerId; }
}
}你需要一些东西来处理它。这可能是某种服务总线中的命令处理程序。“命令处理程序”是众多命令处理程序中的一个,您也可以将其称为“服务”、“域服务”或“消息处理程序”。如果你在做函数式编程,它就是你的消息框实现,或者如果你在做Erlang或Akka,它就是一个Actor。
class CreateOrderHandler : IHandle<CreateOrder>
{
public void Handle(CreateOrder command)
{
With.Policy(IoC.Resolve<ISession>, s => s.BeginTransaction(), s =>
{
var potentialCustomer = s.Get<PotentialCustomer>(command.CustomerId);
potentialCustomer.CreateOrder(command.Total);
return potentialCustomer;
}, RetryPolicies.ExponentialBackOff.RetryOnLivelockAndDeadlock(3));
}
}
interface IHandle<T> /* where T : IMessage */
{
void Handle(T command);
}上面显示了您可能为此给定问题域(应用程序状态/事务处理)选择的API使用情况。
With的实现:
static class With
{
internal static void Policy(Func<ISession> getSession,
Func<ISession, ITransaction> getTransaction,
Func<ISession, EntityBase /* abstract 'entity' base class */> executeAction,
IRetryPolicy policy)
{
//http://fabiomaulo.blogspot.com/2009/06/improving-ado-exception-management-in.html
while (true)
{
using (var session = getSession())
using (var t = getTransaction(session))
{
var entity = executeAction(session);
try
{
// we might not always want to update; have another level of indirection if you wish
session.Update(entity);
t.Commit();
break; // we're done, stop looping
}
catch (ADOException e)
{
// need to clear 2nd level cache, or we'll get 'entity associated with another ISession'-exception
// but the session is now broken in all other regards will will throw exceptions
// if you prod it in any other way
session.Evict(entity);
if (!t.WasRolledBack) t.Rollback(); // will back our transaction
// this would need to be through another level of indirection if you support more databases
var dbException = ADOExceptionHelper.ExtractDbException(e) as SqlException;
if (policy.PerformRetry(dbException)) continue;
throw; // otherwise, we stop by throwing the exception back up the layers
}
}
}
}
}正如您所看到的,我们需要一个新的工作单元;每次出现错误时,都会使用ISession。这就是为什么循环在Using语句/块的外部。拥有函数等同于拥有工厂实例,只是我们直接在对象实例上调用,而不是在对象实例上调用方法。它使得API imho成为一个更好的调用者。
我们希望相当流畅地处理如何执行重试,因此我们有一个可以由不同的处理程序实现的接口,称为IRetryHandler。应该可以将它们链接到您想要强制实施的控制流的每个方面(是的,它非常接近AOP)。类似于AOP的工作原理,返回值用于控制控制流,但只是以真/假的方式,这是我们的需求。
interface IRetryPolicy
{
bool PerformRetry(SqlException ex);
}AggregateRoot,PotentialCustomer是一个具有生命周期的实体。这是您的*.hbm.xml文件/FluentNHibernate所映射的内容。
它有一个与发送的命令1:1对应的方法。这使得命令处理程序完全易于阅读。
此外,使用带有鸭子类型的动态语言,它将允许您将命令的类型名映射到方法,类似于Ruby/Smalltalk的方式。
如果你做的是事件源,事务处理应该是相似的,除了事务不会与NHibernate的类似接口。由此推论,您将保存通过调用CreateOrder(十进制)创建的事件,并为您的实体提供一种从存储中重新读取保存的事件的机制。
最后要注意的一点是,我覆盖了我创建的三个方法。这是NHibernate方面的一个要求,因为它需要一种方法来知道一个实体何时与另一个实体相等,如果它们在集合/包中。关于我的实现here的更多信息。无论如何,这是示例代码,我现在并不关心我的客户,所以我没有实现它们:
sealed class PotentialCustomer : EntityBase
{
public void CreateOrder(decimal total)
{
// validate total
// run business rules
// create event, save into event sourced queue as transient event
// update private state
}
public override bool IsTransient() { throw new NotImplementedException(); }
protected override int GetTransientHashCode() { throw new NotImplementedException(); }
protected override int GetNonTransientHashCode() { throw new NotImplementedException(); }
}我们需要一种创建重试策略的方法。当然,我们可以通过许多方式做到这一点。在这里,我将fluent接口与与静态方法的类型相同的同一对象的实例组合在一起。我显式地实现了这个接口,这样在fluent接口中就看不到其他方法了。此接口仅使用我下面的“示例”实现。
internal class RetryPolicies : INonConfiguredPolicy
{
private readonly IRetryPolicy _Policy;
private RetryPolicies(IRetryPolicy policy)
{
if (policy == null) throw new ArgumentNullException("policy");
_Policy = policy;
}
public static readonly INonConfiguredPolicy ExponentialBackOff =
new RetryPolicies(new ExponentialBackOffPolicy(TimeSpan.FromMilliseconds(200)));
IRetryPolicy INonConfiguredPolicy.RetryOnLivelockAndDeadlock(int retries)
{
return new ChainingPolicy(new[] {new SqlServerRetryPolicy(retries), _Policy});
}
}我们需要一个接口来部分完成对fluent接口的调用。这为我们提供了类型安全。因此,在完成策略配置之前,我们需要两个取消引用运算符(即‘句号’-- (.)),远离我们的静态类型。
internal interface INonConfiguredPolicy
{
IRetryPolicy RetryOnLivelockAndDeadlock(int retries);
}可以解析链接策略。它的实现检查它的所有子对象是否都返回continue,并且在检查的同时,还执行其中的逻辑。
internal class ChainingPolicy : IRetryPolicy
{
private readonly IEnumerable<IRetryPolicy> _Policies;
public ChainingPolicy(IEnumerable<IRetryPolicy> policies)
{
if (policies == null) throw new ArgumentNullException("policies");
_Policies = policies;
}
public bool PerformRetry(SqlException ex)
{
return _Policies.Aggregate(true, (val, policy) => val && policy.PerformRetry(ex));
}
}此策略会让当前线程休眠一段时间;有时数据库会超载,而让多个读取器/写入器不断尝试读取将是对数据库的实际DOS攻击(参见几个月前facebook崩溃时发生的情况,因为它们的缓存服务器同时查询它们的数据库)。
internal class ExponentialBackOffPolicy : IRetryPolicy
{
private readonly TimeSpan _MaxWait;
private TimeSpan _CurrentWait = TimeSpan.Zero; // initially, don't wait
public ExponentialBackOffPolicy(TimeSpan maxWait)
{
_MaxWait = maxWait;
}
public bool PerformRetry(SqlException ex)
{
Thread.Sleep(_CurrentWait);
_CurrentWait = _CurrentWait == TimeSpan.Zero ? TimeSpan.FromMilliseconds(20) : _CurrentWait + _CurrentWait;
return _CurrentWait <= _MaxWait;
}
}同样,在任何好的基于SQL的系统中,我们都需要处理死锁。我们不能真正深入地计划这些,特别是在使用NHibernate时,除了保持严格的事务策略--没有隐式事务;而且要小心使用Open-Session- in -View。还有笛卡尔乘积问题/N+1选择问题,如果你正在获取大量数据,你需要记住这一点。相反,您可以使用Multi-Query或HQL的'fetch‘关键字。
internal class SqlServerRetryPolicy : IRetryPolicy
{
private int _Tries;
private readonly int _CutOffPoint;
public SqlServerRetryPolicy(int cutOffPoint)
{
if (cutOffPoint < 1) throw new ArgumentOutOfRangeException("cutOffPoint");
_CutOffPoint = cutOffPoint;
}
public bool PerformRetry(SqlException ex)
{
if (ex == null) throw new ArgumentNullException("ex");
// checks the ErrorCode property on the SqlException
return SqlServerExceptions.IsThisADeadlock(ex) && ++_Tries < _CutOffPoint;
}
}一个帮助类,使代码读起来更好。
internal static class SqlServerExceptions
{
public static bool IsThisADeadlock(SqlException realException)
{
return realException.ErrorCode == 1205;
}
}不要忘了在IConnectionFactory中处理网络故障(通过委托,可能是通过实现IConnection)。
PS: Session-per-request是一种不完整的模式,如果你不仅仅是在读的话。特别是如果您使用与写入相同的ISession进行读取,并且您没有对读取进行排序,使得它们总是在写入之前。
https://stackoverflow.com/questions/4010265
复制相似问题