我一直在使用ServiceStack message /Client来增强我平台中基于消息的体系结构,并且它一直在完美地工作。我现在正在尝试做一些我认为不受SS消息生产者/消费者支持的事情。
本质上,我是在一个集中的数据中心发送消息(事件),在一个不可靠的网络上,我在全美有大约2000个分散的节点,可能需要知道这个事件,但是事件只需要针对~2000节点中的一个。我需要使用Pub/Sub的任意命名的通道的灵活性,但是MQ的持久性。我从Pub/Sub开始,但是网络太不可靠了,所以我已经将解决方案转移到使用RedisMQServer了。我让它工作,但想确保我没有遗漏在界面上的东西。我很好奇SS的创建者是否考虑过这个用例,如果是的话,讨论的结果是什么?这确实与使用POCO来驱动消息消费的结果/行为的概念作斗争。也许这就是原因?
这是我的制片人
public ExpressLightServiceResponse Get(ExpressLightServiceRequest query)
{
var result = new ExpressLightServiceResponse();
var assemblyBuilder = Thread.GetDomain().DefineDynamicAssembly(new AssemblyName("ArbitaryNamespace"), AssemblyBuilderAccess.Run);
var moduleBuilder = assemblyBuilder.DefineDynamicModule("ModuleName");
var typeBuilder = moduleBuilder.DefineType(string.Format("EventA{0}", query.Store), TypeAttributes.Public);
typeBuilder.DefineDefaultConstructor(MethodAttributes.Public);
var newType = typeBuilder.CreateType();
using (var messageProducer = _messageService.CreateMessageProducer())
{
var message = MessageFactory.Create(newType.CreateInstance());
messageProducer.Publish(message);
}
return result;
}这是我的消费者
public class ServerAppHost : AppHostHttpListenerBase
{
private readonly string _store;
public string StoreQueue => $"EventA{_store}";
public ServerAppHost(string store) : base("Express Light Server", typeof(PubSubServiceStatsService).Assembly)
{
_store = store;
}
public override void Configure(Container container)
{
container.Register<IRedisClientsManager>(new PooledRedisClientManager(ConfigurationManager.ConnectionStrings["Redis"].ConnectionString));
var assemblyBuilder = Thread.GetDomain().DefineDynamicAssembly(new AssemblyName("ArbitaryNamespace"), AssemblyBuilderAccess.Run);
var moduleBuilder = assemblyBuilder.DefineDynamicModule("ModuleName");
var typeBuilder = moduleBuilder.DefineType(StoreQueue, TypeAttributes.Public);
typeBuilder.DefineDefaultConstructor(MethodAttributes.Public);
var newType = typeBuilder.CreateType();
var mi = typeof(Temp).GetMethod("Foo");
var fooRef = mi.MakeGenericMethod(newType);
fooRef.Invoke(new Temp(container.Resolve<IRedisClientsManager>()), null);
}
}
public class Temp
{
private readonly IRedisClientsManager _redisClientsManager;
public Temp(IRedisClientsManager redisClientsManager)
{
_redisClientsManager = redisClientsManager;
}
public void Foo<T>()
{
var mqService = new RedisMqServer(_redisClientsManager);
mqService.RegisterHandler<T>(DoWork);
mqService.Start();
}
private object DoWork<T>(IMessage<T> arg)
{
//Do work
return null;
}
}这给我的是Pub/Sub具有队列持久性的灵活性。有没有人看到/知道一种更“本土化”的方法来实现这一点?
发布于 2016-11-10 13:46:25
应该只有一个MQ主机在您的AppHost中注册,所以我首先将它从包装器类中删除,并让它注册处理程序,例如:
public override void Configure(Container container)
{
//...
container.Register<IMessageService>(
c => new RedisMqServer(c.Resolve<IRedisClientsManager>());
var mqServer = container.Resolve<IMessageService>();
fooRef.Invoke(new Temp(mqServer), null);
mqServer.Start();
}
public class Temp
{
private readonly IMessageService mqServer;
public Temp(IMessageService mqServer)
{
this.mqServer = mqServer;
}
public void Foo<T>() => mqService.RegisterHandler<T>(DoWork);
}但是这种方法不适合ServiceStack,因为它鼓励使用代码优先消息,它定义了客户/服务器用来处理发送和接收的消息的服务契约。因此,如果您想使用ServiceStack发送自定义消息,我建议每个消息都有一个单独的类,否则有一个泛型类型,比如SendEvent,其中消息或事件类型是类中的一个属性。
否则,如果您想继续使用自定义消息不使用RedisMqServer,您可以只使用一个专用MQ类兔MQ或者如果您喜欢使用一个直接红表 --这是所有Redis在下面使用的数据结构。
https://stackoverflow.com/questions/40512758
复制相似问题