using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;MessageBroker类包含线程安全结构,以保存消息类型到处理程序的映射。向这些处理程序发送消息。支持带有和不带回复的消息。
public class MessageBroker
{
public ConcurrentDictionary<(string name, Type message), ConcurrentBag<WeakReference<BrokeredMessageHandler>>> BrokeredMessageHandlers =
new ConcurrentDictionary<(string name, Type message), ConcurrentBag<WeakReference<BrokeredMessageHandler>>>();
public ConcurrentDictionary<(string name, Type message), ConcurrentBag<WeakReference<BrokeredMessageWithReplyHandler>>> BrokeredMessageWithReplyHandlers =
new ConcurrentDictionary<(string name, Type message), ConcurrentBag<WeakReference<BrokeredMessageWithReplyHandler>>>();
private static MessageBroker _instance;
public static MessageBroker Instance => _instance ?? (_instance = new MessageBroker());
public async Task Send(
IBrokeredMessageBase message
, Action<Guid> sent = null
, Action<Guid> complete = null
, CancellationToken cancellationToken = default)
{
var key = (message.MessageName, message.GetType());
if (BrokeredMessageHandlers.ContainsKey(key))
{
var subscribers = BrokeredMessageHandlers[key].ToImmutableList();
var tasks = new List<Task>();
subscribers.ForEach(s => tasks.Add(Task.Factory.StartNew(() =>
{
if (s.TryGetTarget(out var target) && !cancellationToken.IsCancellationRequested) target(message);
}, cancellationToken)));
if (!cancellationToken.IsCancellationRequested)
{
sent?.Invoke(message.MessageUID);
await Task.Run(() => { Task.WaitAll(tasks.ToArray(), cancellationToken); }, cancellationToken);
if (!cancellationToken.IsCancellationRequested)
{
complete?.Invoke(message.MessageUID);
}
}
}
}
public async Task SendWithReply(
BrokeredMessageWithReplyBase message
, Action<Guid> sent = null
, Action<Guid> complete = null
, CancellationToken cancellationToken = default)
{
var key = (message.MessageName, message.GetType());
if (BrokeredMessageWithReplyHandlers.ContainsKey(key))
{
var subscribers = BrokeredMessageWithReplyHandlers[key].ToImmutableList();
var tasks = new List<Task>();
subscribers.ForEach(s => tasks.Add(new Task(() =>
{
if (s.TryGetTarget(out var target)) target(message);
})));
if (!cancellationToken.IsCancellationRequested)
{
sent?.Invoke(message.MessageUID);
await Task.Run(() => { Task.WaitAll(tasks.ToArray(), cancellationToken); }, cancellationToken);
if (!cancellationToken.IsCancellationRequested)
{
complete?.Invoke(message.MessageUID);
}
}
}
}
public void AddHandler(string messageName, Type messageType, BrokeredMessageHandler brokeredMessageHandler)
{
var key = (messageName, messageType);
BrokeredMessageHandlers.TryGetValue(key, out var bag);
if (bag == null)
{
bag = new ConcurrentBag<WeakReference<BrokeredMessageHandler>>();
BrokeredMessageHandlers.TryAdd(key, bag);
}
var reference = new WeakReference<BrokeredMessageHandler>(brokeredMessageHandler);
if (!bag.Contains(reference))
{
bag.Add(reference);
}
}
public void AddHandlerWithReply(string messageName, Type messageType, BrokeredMessageWithReplyHandler brokeredMessageWithReplyHandler)
{
var key = (messageName, messageType);
BrokeredMessageWithReplyHandlers.TryGetValue(key, out var bag);
if (bag == null)
{
bag = new ConcurrentBag<WeakReference<BrokeredMessageWithReplyHandler>>();
BrokeredMessageWithReplyHandlers.TryAdd(key, bag);
}
var reference = new WeakReference<BrokeredMessageWithReplyHandler>(brokeredMessageWithReplyHandler);
if (!bag.Contains(reference))
{
bag.Add(reference);
}
}
}BrokeredMessageHandler & BrokeredMessageWithReplyHandler用于将消息发送到。
public delegate void BrokeredMessageHandler(IBrokeredMessageBase message);
public delegate void BrokeredMessageWithReplyHandler(BrokeredMessageWithReplyBase message);IBrokeredMessageBase是所有代理消息的基本接口。BrokeredMessageBase抽象类提供了基本的实现。BrokeredMessage<TParameter>通用参数化代理消息。BrokeredMessage非通用参数化代理消息。 public interface IBrokeredMessageBase
{
string MessageName { get; }
Guid MessageUID { get; }
DateTimeOffset TimestampUTC { get; }
}
public abstract class BrokeredMessageBase : IBrokeredMessageBase
{
public string MessageName { get; }
public Guid MessageUID { get; }
public DateTimeOffset TimestampUTC { get; } = DateTimeOffset.UtcNow;
protected BrokeredMessageBase(string name, Guid uid = default)
{
MessageName = name;
MessageUID = uid != default ? uid : Guid.NewGuid();
}
}
public class BrokeredMessage<TParameter> : BrokeredMessageBase
{
public TParameter Parameter { get; }
public BrokeredMessage(TParameter parameter, string name, Guid uid = default) : base(name, uid)
{
Parameter = parameter;
}
}
public class BrokeredMessage : BrokeredMessageBase
{
public object Parameter { get; }
public BrokeredMessage(object parameter, string name, Guid uid = default) : base(name, uid)
{
Parameter = parameter;
}
}BrokeredMessageWithReplyBase抽象类提供了基本的实现。BrokeredMessageWithReply<TParameter>通用参数化代理消息。BrokeredMessageWithReply非通用参数化代理消息。 public abstract class BrokeredMessageWithReplyBase : BrokeredMessageBase
{
public ConcurrentBag<object> Replies { get; }
protected BrokeredMessageWithReplyBase(string name, Guid uid = default) : base(name, uid)
{
Replies = new ConcurrentBag<object>();
}
}
public class BrokeredMessageWithReply<TParameter> : BrokeredMessageBase
{
public TParameter Parameter { get; }
public BrokeredMessageWithReply(TParameter parameter, string name, Guid uid = default) : base(name, uid)
{
Parameter = parameter;
}
}
public class BrokeredMessageWithReply : BrokeredMessageBase
{
public object Parameter { get; }
public BrokeredMessageWithReply(object parameter, string name, Guid uid = default) : base(name, uid)
{
Parameter = parameter;
}
}发布于 2019-10-28 07:21:32
MessageBrokerBrokeredMessageHandlers和BrokeredMessageWithReplyHandlers都应该是readonly,因为您不希望有人在类之外将它们设置为null。public async Task Send()和public async Task SendWithReply(),应该追加Async后缀。请参阅:https://docs.microsoft.com/en-us/dotnet/csharp/async#important-info-and-adviceDictionary<TKey, TValue>的值,则不应该将ContainsKey()与Item属性getter一起使用,而应与TryGetValue()一起使用,因为通过将ContainsKey()与Item getter结合使用,您将检查该键是否存在两次。从参考源公共bool ContainsKey(TKey key) { if (key == null)抛出新ArgumentNullException("key");TValue throwAwayValue;返回TryGetValue(key,out throwAwayValue);} public TValue此TKey密钥{ get { TValue值;if (!TryGetValue(key,out值)){抛出新KeyNotFoundException();}返回值;} set {如果( ==键为null)抛出新的ArgumentNullException("key");TValue虚拟;TryAddInternal(键、值、真、真、外虚拟);}}发布于 2019-10-28 11:26:50
如果不理解将要在其中使用的场景,就很难对某些内容进行检查。因此,请把这看作是一些可能或不相关的随机想法:
Task.Run内部等待几乎不是一个好主意。您使用线程池中的线程来不做任何事情。如果可能的话,尝试使用事件循环。SendWithReply中,我看不到任务在哪里运行。而且,看起来它从Send复制了很多代码。AddHandler并不是线程安全的.如果两个线程写入相同的键,其中一个可能会失败,而且永远不会知道。发布于 2019-10-29 12:45:11
以下是基于反馈的更新代码:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Boltron.UI.UWP.Commands
{
public class MessageBroker
{
private readonly ConcurrentDictionary<(string name, Type message), ConcurrentBag<Action<IBrokeredMessage>>> _brokeredMessageHandlers =
new ConcurrentDictionary<(string name, Type message), ConcurrentBag<Action<IBrokeredMessage>>>();
private readonly ConcurrentDictionary<(string name, Type message), ConcurrentBag<Action<IBrokeredMessageWithReply>>> _brokeredMessageWithReplyHandlers =
new ConcurrentDictionary<(string name, Type message), ConcurrentBag<Action<IBrokeredMessageWithReply>>>();
public static MessageBroker Instance { get; } = new MessageBroker();
public async Task SendAsync(
IBrokeredMessage message
, Action<Guid> sent = null
, Action<Guid> complete = null
, CancellationToken cancellationToken = default)
{
var key = (message.MessageName, message.GetType());
if (_brokeredMessageHandlers.TryGetValue(key, out var bag))
{
var subscribers = bag.ToImmutableList();
await SendMessageAsync(message, subscribers, sent, complete, cancellationToken);
}
}
public async Task SendWithReplyAsync(
IBrokeredMessageWithReply message
, Action<Guid> sent = null
, Action<Guid> complete = null
, CancellationToken cancellationToken = default)
{
var key = (message.MessageName, message.GetType());
if (_brokeredMessageWithReplyHandlers.TryGetValue(key, out var bag))
{
var subscribers = bag.Cast<Action<IBrokeredMessage>>().ToImmutableList();
await SendMessageAsync(message, subscribers, sent, complete, cancellationToken);
}
}
private async Task SendMessageAsync(
IBrokeredMessage message
, ImmutableList<Action<IBrokeredMessage>> subscribers
, Action<Guid> sent = null
, Action<Guid> complete = null
, CancellationToken cancellationToken = default)
{
var tasks = new List<Task>();
subscribers.ForEach(s => tasks.Add(Task.Factory.StartNew(() =>
{
if (s != null && !cancellationToken.IsCancellationRequested)
{
s(message);
}
}, cancellationToken)));
if (!cancellationToken.IsCancellationRequested)
{
sent?.Invoke(message.MessageUid);
await Task.Run(
() => Task.WaitAll(tasks.ToArray(), cancellationToken)
, cancellationToken);
if (!cancellationToken.IsCancellationRequested)
{
complete?.Invoke(message.MessageUid);
}
}
}
public void AddHandler(string messageName
, Type messageType
, Action<IBrokeredMessage> brokeredMessageHandler)
{
var key = (messageName, messageType);
_brokeredMessageHandlers.TryGetValue(key, out var bag);
if (bag == null)
{
bag = new ConcurrentBag<Action<IBrokeredMessage>>();
_brokeredMessageHandlers.TryAdd(key, bag);
}
if (!bag.Contains(brokeredMessageHandler))
{
bag.Add(brokeredMessageHandler);
}
}
public void AddHandlerWithReply(string messageName
, Type messageType
, Action<IBrokeredMessageWithReply> brokeredMessageWithReplyHandler)
{
var key = (messageName, messageType);
_brokeredMessageWithReplyHandlers.TryGetValue(key, out var bag);
if (bag == null)
{
bag = new ConcurrentBag<Action<IBrokeredMessageWithReply>>();
_brokeredMessageWithReplyHandlers.TryAdd(key, bag);
}
if (!bag.Contains(brokeredMessageWithReplyHandler))
{
bag.Add(brokeredMessageWithReplyHandler);
}
}
}
public interface IBrokeredMessage
{
string MessageName { get; }
Guid MessageUid { get; }
DateTimeOffset TimestampUtc { get; }
}
public abstract class BrokeredMessageBase : IBrokeredMessage
{
public string MessageName { get; }
public Guid MessageUid { get; }
public DateTimeOffset TimestampUtc { get; } = DateTimeOffset.UtcNow;
protected BrokeredMessageBase(string name, Guid uid = default)
{
MessageName = name;
MessageUid = uid != default ? uid : Guid.NewGuid();
}
}
public class BrokeredMessage<TParameter> : BrokeredMessageBase
{
public TParameter Parameter { get; }
public BrokeredMessage(TParameter parameter, string name, Guid uid = default) : base(name, uid)
{
Parameter = parameter;
}
}
public class BrokeredMessage : BrokeredMessageBase
{
public object Parameter { get; }
public BrokeredMessage(object parameter, string name, Guid uid = default) : base(name, uid)
{
Parameter = parameter;
}
}
public interface IBrokeredMessageWithReply : IBrokeredMessage
{
}
public abstract class BrokeredMessageWithReplyBase<TReturn> : BrokeredMessageBase, IBrokeredMessageWithReply
{
private readonly object _owner;
private ConcurrentBag<TReturn> Replies { get; }
protected BrokeredMessageWithReplyBase(string name, object owner, Guid uid = default) : base(name, uid)
{
_owner = owner;
Replies = new ConcurrentBag<TReturn>();
}
public void AddReply(TReturn reply)
{
Replies.Add(reply);
}
public IEnumerable<TReturn> GetReplies(object claimant)
{
if (claimant != _owner) yield break;
foreach (var item in Replies)
{
yield return item;
}
}
}
public class BrokeredMessageWithReply<TParameter, TReturn> : BrokeredMessageWithReplyBase<TReturn>
{
public TParameter Parameter { get; }
public BrokeredMessageWithReply(TParameter parameter, string name, Guid uid = default) : base(name, uid)
{
Parameter = parameter;
}
}
public class BrokeredMessageWithReply<TReturn> : BrokeredMessageWithReplyBase<TReturn>
{
public object Parameter { get; }
public BrokeredMessageWithReply(object parameter, string name, Guid uid = default) : base(name, uid)
{
Parameter = parameter;
}
}
}
```https://codereview.stackexchange.com/questions/231393
复制相似问题