首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Pub-面向所有Dotnet平台的MessageBroker在C#中的应用

Pub-面向所有Dotnet平台的MessageBroker在C#中的应用
EN

Code Review用户
提问于 2019-10-28 02:23:31
回答 3查看 164关注 0票数 2
代码语言:javascript
复制
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

MessageBroker

包含线程安全结构,以保存消息类型到处理程序的映射。向这些处理程序发送消息。支持带有和不带回复的消息。

代码语言:javascript
复制
    public class MessageBroker
    {
        public ConcurrentDictionary<(string name, Type message), ConcurrentBag<WeakReference<BrokeredMessageHandler>>> BrokeredMessageHandlers =
            new ConcurrentDictionary<(string name, Type message), ConcurrentBag<WeakReference<BrokeredMessageHandler>>>();

        public ConcurrentDictionary<(string name, Type message), ConcurrentBag<WeakReference<BrokeredMessageWithReplyHandler>>> BrokeredMessageWithReplyHandlers =
            new ConcurrentDictionary<(string name, Type message), ConcurrentBag<WeakReference<BrokeredMessageWithReplyHandler>>>();

        private static MessageBroker _instance;

        public static MessageBroker Instance => _instance ?? (_instance = new MessageBroker());

        public async Task Send(
            IBrokeredMessageBase message
            , Action<Guid> sent = null
            , Action<Guid> complete = null
            , CancellationToken cancellationToken = default)
        {
            var key = (message.MessageName, message.GetType());

            if (BrokeredMessageHandlers.ContainsKey(key))
            {
                var subscribers = BrokeredMessageHandlers[key].ToImmutableList();

                var tasks = new List<Task>();

                subscribers.ForEach(s => tasks.Add(Task.Factory.StartNew(() =>
                {
                    if (s.TryGetTarget(out var target) && !cancellationToken.IsCancellationRequested) target(message);
                }, cancellationToken)));

                if (!cancellationToken.IsCancellationRequested)
                {
                    sent?.Invoke(message.MessageUID);

                    await Task.Run(() => { Task.WaitAll(tasks.ToArray(), cancellationToken); }, cancellationToken);

                    if (!cancellationToken.IsCancellationRequested)
                    {
                        complete?.Invoke(message.MessageUID);
                    }
                }
            }
        }

        public async Task SendWithReply(
            BrokeredMessageWithReplyBase message
            , Action<Guid> sent = null
            , Action<Guid> complete = null
            , CancellationToken cancellationToken = default)
        {
            var key = (message.MessageName, message.GetType());

            if (BrokeredMessageWithReplyHandlers.ContainsKey(key))
            {
                var subscribers = BrokeredMessageWithReplyHandlers[key].ToImmutableList();

                var tasks = new List<Task>();

                subscribers.ForEach(s => tasks.Add(new Task(() =>
                {
                    if (s.TryGetTarget(out var target)) target(message);
                })));

                if (!cancellationToken.IsCancellationRequested)
                {
                    sent?.Invoke(message.MessageUID);

                    await Task.Run(() => { Task.WaitAll(tasks.ToArray(), cancellationToken); }, cancellationToken);

                    if (!cancellationToken.IsCancellationRequested)
                    {
                        complete?.Invoke(message.MessageUID);
                    }
                }
            }
        }

        public void AddHandler(string messageName, Type messageType, BrokeredMessageHandler brokeredMessageHandler)
        {
            var key = (messageName, messageType);

            BrokeredMessageHandlers.TryGetValue(key, out var bag);

            if (bag == null)
            {
                bag = new ConcurrentBag<WeakReference<BrokeredMessageHandler>>();
                BrokeredMessageHandlers.TryAdd(key, bag);
            }

            var reference = new WeakReference<BrokeredMessageHandler>(brokeredMessageHandler);
            if (!bag.Contains(reference))
            {
                bag.Add(reference);
            }
        }

        public void AddHandlerWithReply(string messageName, Type messageType, BrokeredMessageWithReplyHandler brokeredMessageWithReplyHandler)
        {
            var key = (messageName, messageType);

            BrokeredMessageWithReplyHandlers.TryGetValue(key, out var bag);

            if (bag == null)
            {
                bag = new ConcurrentBag<WeakReference<BrokeredMessageWithReplyHandler>>();
                BrokeredMessageWithReplyHandlers.TryAdd(key, bag);
            }

            var reference = new WeakReference<BrokeredMessageWithReplyHandler>(brokeredMessageWithReplyHandler);
            if (!bag.Contains(reference))
            {
                bag.Add(reference);
            }
        }
    }

BrokeredMessageHandler & BrokeredMessageWithReplyHandler

用于将消息发送到。

代码语言:javascript
复制
    public delegate void BrokeredMessageHandler(IBrokeredMessageBase message);

    public delegate void BrokeredMessageWithReplyHandler(BrokeredMessageWithReplyBase message);

消息(没有回复)

  • IBrokeredMessageBase是所有代理消息的基本接口。
  • BrokeredMessageBase抽象类提供了基本的实现。
  • BrokeredMessage<TParameter>通用参数化代理消息。
  • BrokeredMessage非通用参数化代理消息。
代码语言:javascript
复制
    public interface IBrokeredMessageBase
    {
        string MessageName { get; }
        Guid MessageUID { get; }
        DateTimeOffset TimestampUTC { get; }
    }

    public abstract class BrokeredMessageBase : IBrokeredMessageBase
    {
        public string MessageName { get; }
        public Guid MessageUID { get; }
        public DateTimeOffset TimestampUTC { get; } = DateTimeOffset.UtcNow;

        protected BrokeredMessageBase(string name, Guid uid = default)
        {
            MessageName = name;
            MessageUID = uid != default ? uid : Guid.NewGuid();
        }
    }

    public class BrokeredMessage<TParameter> : BrokeredMessageBase
    {
        public TParameter Parameter { get; }
        public BrokeredMessage(TParameter parameter, string name, Guid uid = default) : base(name, uid)
        {
            Parameter = parameter;
        }
    }

    public class BrokeredMessage : BrokeredMessageBase
    {
        public object Parameter { get; }
        public BrokeredMessage(object parameter, string name, Guid uid = default) : base(name, uid)
        {
            Parameter = parameter;
        }
    }

消息(带有回复)

  • BrokeredMessageWithReplyBase抽象类提供了基本的实现。
  • BrokeredMessageWithReply<TParameter>通用参数化代理消息。
  • BrokeredMessageWithReply非通用参数化代理消息。
代码语言:javascript
复制
    public abstract class BrokeredMessageWithReplyBase : BrokeredMessageBase
    {
        public ConcurrentBag<object> Replies { get; }

        protected BrokeredMessageWithReplyBase(string name, Guid uid = default) : base(name, uid)
        {
            Replies = new ConcurrentBag<object>();
        }
    }

    public class BrokeredMessageWithReply<TParameter> : BrokeredMessageBase
    {
        public TParameter Parameter { get; }
        public BrokeredMessageWithReply(TParameter parameter, string name, Guid uid = default) : base(name, uid)
        {
            Parameter = parameter;
        }
    }

    public class BrokeredMessageWithReply : BrokeredMessageBase
    {
        public object Parameter { get; }
        public BrokeredMessageWithReply(object parameter, string name, Guid uid = default) : base(name, uid)
        {
            Parameter = parameter;
        }
    }
EN

回答 3

Code Review用户

发布于 2019-10-28 07:21:32

MessageBroker

  • 公共BrokeredMessageHandlersBrokeredMessageWithReplyHandlers都应该是readonly,因为您不希望有人在类之外将它们设置为null
  • 对于两个方法名public async Task Send()public async Task SendWithReply(),应该追加Async后缀。请参阅:https://docs.microsoft.com/en-us/dotnet/csharp/async#important-info-and-advice
  • 如果需要获取Dictionary<TKey, TValue>的值,则不应该将ContainsKey()Item属性getter一起使用,而应与TryGetValue()一起使用,因为通过将ContainsKey()Item getter结合使用,您将检查该键是否存在两次。从参考源公共bool ContainsKey(TKey key) { if (key == null)抛出新ArgumentNullException("key");TValue throwAwayValue;返回TryGetValue(key,out throwAwayValue);} public TValue此TKey密钥{ get { TValue值;if (!TryGetValue(key,out值)){抛出新KeyNotFoundException();}返回值;} set {如果( ==键为null)抛出新的ArgumentNullException("key");TValue虚拟;TryAddInternal(键、值、真、真、外虚拟);}}
票数 2
EN

Code Review用户

发布于 2019-10-28 11:26:50

如果不理解将要在其中使用的场景,就很难对某些内容进行检查。因此,请把这看作是一些可能或不相关的随机想法:

  1. Task.Run内部等待几乎不是一个好主意。您使用线程池中的线程来不做任何事情。如果可能的话,尝试使用事件循环。
  2. SendWithReply中,我看不到任务在哪里运行。而且,看起来它从Send复制了很多代码。
  3. AddHandler并不是线程安全的.如果两个线程写入相同的键,其中一个可能会失败,而且永远不会知道。
  4. 无法想到发送回调的任何用途,因为在调用时,任务可能已经完成。
票数 2
EN

Code Review用户

发布于 2019-10-29 12:45:11

以下是基于反馈的更新代码:

代码语言:javascript
复制
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Boltron.UI.UWP.Commands
{
    public class MessageBroker
    {
        private readonly ConcurrentDictionary<(string name, Type message), ConcurrentBag<Action<IBrokeredMessage>>> _brokeredMessageHandlers =
            new ConcurrentDictionary<(string name, Type message), ConcurrentBag<Action<IBrokeredMessage>>>();

        private readonly ConcurrentDictionary<(string name, Type message), ConcurrentBag<Action<IBrokeredMessageWithReply>>> _brokeredMessageWithReplyHandlers =
            new ConcurrentDictionary<(string name, Type message), ConcurrentBag<Action<IBrokeredMessageWithReply>>>();

        public static MessageBroker Instance { get; } = new MessageBroker();

        public async Task SendAsync(
            IBrokeredMessage message
            , Action<Guid> sent = null
            , Action<Guid> complete = null
            , CancellationToken cancellationToken = default)
        {
            var key = (message.MessageName, message.GetType());

            if (_brokeredMessageHandlers.TryGetValue(key, out var bag))
            {
                var subscribers = bag.ToImmutableList();

                await SendMessageAsync(message, subscribers, sent, complete, cancellationToken);
            }
        }

        public async Task SendWithReplyAsync(
            IBrokeredMessageWithReply message
            , Action<Guid> sent = null
            , Action<Guid> complete = null
            , CancellationToken cancellationToken = default)
        {
            var key = (message.MessageName, message.GetType());

            if (_brokeredMessageWithReplyHandlers.TryGetValue(key, out var bag))
            {
                var subscribers = bag.Cast<Action<IBrokeredMessage>>().ToImmutableList();

                await SendMessageAsync(message, subscribers, sent, complete, cancellationToken);
            }
        }

        private async Task SendMessageAsync(
            IBrokeredMessage message
            , ImmutableList<Action<IBrokeredMessage>> subscribers
            , Action<Guid> sent = null
            , Action<Guid> complete = null
            , CancellationToken cancellationToken = default)
        {
            var tasks = new List<Task>();

            subscribers.ForEach(s => tasks.Add(Task.Factory.StartNew(() =>
            {
                if (s != null && !cancellationToken.IsCancellationRequested)
                {
                    s(message);
                }
            }, cancellationToken)));

            if (!cancellationToken.IsCancellationRequested)
            {
                sent?.Invoke(message.MessageUid);

                await Task.Run(
                    () => Task.WaitAll(tasks.ToArray(), cancellationToken)
                    , cancellationToken);

                if (!cancellationToken.IsCancellationRequested)
                {
                    complete?.Invoke(message.MessageUid);
                }
            }

        }

        public void AddHandler(string messageName
            , Type messageType
            , Action<IBrokeredMessage> brokeredMessageHandler)
        {
            var key = (messageName, messageType);

            _brokeredMessageHandlers.TryGetValue(key, out var bag);

            if (bag == null)
            {
                bag = new ConcurrentBag<Action<IBrokeredMessage>>();
                _brokeredMessageHandlers.TryAdd(key, bag);
            }

            if (!bag.Contains(brokeredMessageHandler))
            {
                bag.Add(brokeredMessageHandler);
            }
        }

        public void AddHandlerWithReply(string messageName
            , Type messageType
            , Action<IBrokeredMessageWithReply> brokeredMessageWithReplyHandler)
        {
            var key = (messageName, messageType);

            _brokeredMessageWithReplyHandlers.TryGetValue(key, out var bag);

            if (bag == null)
            {
                bag = new ConcurrentBag<Action<IBrokeredMessageWithReply>>();
                _brokeredMessageWithReplyHandlers.TryAdd(key, bag);
            }

            if (!bag.Contains(brokeredMessageWithReplyHandler))
            {
                bag.Add(brokeredMessageWithReplyHandler);
            }
        }
    }

    public interface IBrokeredMessage
    {
        string MessageName { get; }
        Guid MessageUid { get; }
        DateTimeOffset TimestampUtc { get; }
    }

    public abstract class BrokeredMessageBase : IBrokeredMessage
    {
        public string MessageName { get; }
        public Guid MessageUid { get; }
        public DateTimeOffset TimestampUtc { get; } = DateTimeOffset.UtcNow;

        protected BrokeredMessageBase(string name, Guid uid = default)
        {
            MessageName = name;
            MessageUid = uid != default ? uid : Guid.NewGuid();
        }
    }

    public class BrokeredMessage<TParameter> : BrokeredMessageBase
    {
        public TParameter Parameter { get; }
        public BrokeredMessage(TParameter parameter, string name, Guid uid = default) : base(name, uid)
        {
            Parameter = parameter;
        }
    }

    public class BrokeredMessage : BrokeredMessageBase
    {
        public object Parameter { get; }
        public BrokeredMessage(object parameter, string name, Guid uid = default) : base(name, uid)
        {
            Parameter = parameter;
        }
    }

    public interface IBrokeredMessageWithReply : IBrokeredMessage
    {

    }

    public abstract class BrokeredMessageWithReplyBase<TReturn> : BrokeredMessageBase, IBrokeredMessageWithReply
    {
        private readonly object _owner;
        private ConcurrentBag<TReturn> Replies { get; }

        protected BrokeredMessageWithReplyBase(string name, object owner, Guid uid = default) : base(name, uid)
        {
            _owner = owner;
            Replies = new ConcurrentBag<TReturn>();
        }

        public void AddReply(TReturn reply)
        {
            Replies.Add(reply);
        }

        public IEnumerable<TReturn> GetReplies(object claimant)
        {
            if (claimant != _owner) yield break;

            foreach (var item in Replies)
            {
                yield return item;
            }
        }
    }

    public class BrokeredMessageWithReply<TParameter, TReturn> : BrokeredMessageWithReplyBase<TReturn>
    {
        public TParameter Parameter { get; }
        public BrokeredMessageWithReply(TParameter parameter, string name, Guid uid = default) : base(name, uid)
        {
            Parameter = parameter;
        }
    }

    public class BrokeredMessageWithReply<TReturn> : BrokeredMessageWithReplyBase<TReturn>
    {
        public object Parameter { get; }
        public BrokeredMessageWithReply(object parameter, string name, Guid uid = default) : base(name, uid)
        {
            Parameter = parameter;
        }
    }
}
```
代码语言:javascript
复制
票数 0
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/231393

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档