我有两个系统,一个用于发布消息,另一个用于使用它们。两者都使用Masstransit (与RabbitMQ一起使用),并使用ASP.Net web 2和OWIN (以及Autofac作为IoC容器)实现。如果我的使用者没有依赖项,,一切都很好,但是当我向使用者注入依赖关系时,消费方法永远不会被执行(初始化期间没有抛出错误)。
这是相关的Publisher代码:
//Startup.cs
public class Startup
{
public void Configuration(IAppBuilder app)
{
HttpConfiguration config = new HttpConfiguration();
IContainer container = null;
var builder = new ContainerBuilder();
builder.Register(context =>
{
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
IRabbitMqHost rabbitMqHost = cfg.Host(new Uri(ConfigurationManager.AppSettings["RabbitMQHost"]), settings =>
{
settings.Username(ConfigurationManager.AppSettings["RabbitMQUser"]);
settings.Password(ConfigurationManager.AppSettings["RabbitMQPassword"]);
});
});
return busControl;
})
.As<IBusControl>()
.As<IBus>()
.SingleInstance();
// Register Web API controllers
builder.RegisterApiControllers(Assembly.GetExecutingAssembly());
// Resolve dependencies
container = builder.Build();
config.DependencyResolver = AutofacWebApiDependencyResolver(container);
WebApiConfig.Register(config);
SwaggerConfig.Register(config);
app.UseCors(CorsOptions.AllowAll);
// Register the Autofac middleware FIRST.
app.UseAutofacMiddleware(container);
app.UseWebApi(config);
// Starts MassTransit Service bus, and registers stopping of bus on app dispose
var bus = container.Resolve<IBusControl>();
var busHandle = bus.StartAsync();
var properties = new AppProperties(app.Properties);
if (properties.OnAppDisposing != CancellationToken.None)
{
properties.OnAppDisposing.Register(() => busHandle.Result.StopAsync(TimeSpan.FromSeconds(30)));
}
}
}
// Controller
public IHttpActionResult Post()
{
_bus.Publish<IFooMessage>(new
{
Foo = "Foo"
});
return Ok();
}这是相关的消费者代码:
// Startup.cs
public class Startup
{
public void Configuration(IAppBuilder app)
{
HttpConfiguration config = new HttpConfiguration();
IContainer container = null;
var builder = new ContainerBuilder();
builder.RegisterType<FooService>().As<IFooService>().InstancePerRequest();
builder.RegisterModule<BusModule>();
builder.RegisterModule<ConsumersModule>();
// Register Web API controllers
builder.RegisterApiControllers(Assembly.GetExecutingAssembly());
// Resolve dependencies
container = builder.Build();
config.DependencyResolver = new AutofacWebApiDependencyResolver(container);
WebApiConfig.Register(config);
SwaggerConfig.Register(config);
app.UseCors(CorsOptions.AllowAll);
// Register the Autofac middleware FIRST.
app.UseAutofacMiddleware(container);
app.UseWebApi(config);
// Starts MassTransit Service bus, and registers stopping of bus on app dispose
var bus = container.Resolve<IBusControl>();
var busHandle = bus.StartAsync();
var properties = new AppProperties(app.Properties);
if (properties.OnAppDisposing != CancellationToken.None)
{
properties.OnAppDisposing.Register(() => busHandle.Result.StopAsync(TimeSpan.FromSeconds(30)));
}
}
}
// BusModule.cs
public class BusModule : Module
{
protected override void Load(ContainerBuilder builder)
{
builder.Register(context =>
{
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
IRabbitMqHost rabbitMqHost = cfg.Host(new Uri(ConfigurationManager.AppSettings["RabbitMQHost"]), settings =>
{
settings.Username(ConfigurationManager.AppSettings["RabbitMQUser"]);
settings.Password(ConfigurationManager.AppSettings["RabbitMQPassword"]);
});
cfg.ReceiveEndpoint(rabbitMqHost, "IP.AgilePoint.queue", ec =>
{
ec.LoadFrom(context);
});
});
return busControl;
})
.SingleInstance()
.As<IBusControl>()
.As<IBus>();
}
}
// ConsumerModule.cs
public class ConsumersModule : Module
{
protected override void Load(ContainerBuilder builder)
{
builder.RegisterType<FooConsumer>();
}
}
// FooConsumer.cs
public class FooConsumer : IConsumer<IFooMessage>
{
private IFooService _service;
public FooConsumer(IFooService service)
{
_service = service;
}
public Task Consume(ConsumeContext<IFooMessage> context)
{
IFooMessage @event = context.Message;
_service.DoStuff(@event.Foo);
return Task.FromResult(context.Message);
}
}注意,我的FooConsumer有一个IFooService的依赖项(构造函数)。我已经跟踪了Masstransit文件,但我无法让它起作用。我做错了什么?
框架版本:
更新:
代码可以找到在这个Github存储库中
发布于 2017-10-14 10:05:30
最后我发现我做错了什么。由于某些原因,我无法在RabbitMQ管理中看到队列。当我能够看到错误队列时,我注意到了以下错误:

我以这样的方式注册了我的IFooService:
builder.RegisterType<FooService>().As<IFooService>().InstancePerRequest();InstancePerRequest()是导致错误的原因。如果我向builder.RegisterType<FooService>().As<IFooService>()注册服务,一切都很好。我认为这是因为我的总线实例是作为单例(注册为SingleIsntace())工作的。使用web项目托管我的总线/消费者这一事实使我感到困惑。
总之,感谢@consumers和@Alexey为我指明了实现的正确方向(使用MT扩展来注册消费者)。
发布于 2017-10-13 16:40:34
我建议查看针对Autofac的文档,它是通过扩展库得到完全支持的容器。
http://masstransit-project.com/MassTransit/usage/containers/autofac.html
https://stackoverflow.com/questions/46733456
复制相似问题