首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用RMQ和spring云流创建基于分区的使用者

如何使用RMQ和spring云流创建基于分区的使用者
EN

Stack Overflow用户
提问于 2017-04-27 14:46:17
回答 2查看 497关注 0票数 1

我能够使用云流和兔子mq开发示例消费者,如果我有生产者创建的3个分区,如果我在CF中部署了3个实例,那么每个实例都会选择一个队列并使用索引处理消息。

现在的问题是,如果我有10个分区,我似乎需要10个实例,这是资源的浪费,我们是否可以让一个消费者监听多个分区。我之所以有基于分区的生产者,是因为对我来说,消息顺序是用来处理问题的。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-04-27 15:41:34

有一个办法..。

代码语言:javascript
复制
@SpringBootApplication
@EnableBinding(TwoInputs.class)
public class So43661064Application {

    public static void main(String[] args) {
        SpringApplication.run(So43661064Application.class, args);
    }

    @StreamListener("input1")
    public void foo1(String in) {
        doFoo(in);
    }

    @StreamListener("input2")
    public void foo2(String in) {
        doFoo(in);
    }

    protected void doFoo(String in) {
        System.out.println(in);
    }

    public interface TwoInputs {

        @Input("input1")
        SubscribableChannel input1();

        @Input("input2")
        SubscribableChannel input2();

    }

}

代码语言:javascript
复制
spring.cloud.stream.bindings.input1.group=bar-0
spring.cloud.stream.bindings.input1.destination=foo
spring.cloud.stream.rabbit.bindings.input1.consumer.bindingRoutingKey=foo-0

spring.cloud.stream.bindings.input2.group=bar-1
spring.cloud.stream.bindings.input2.destination=foo
spring.cloud.stream.rabbit.bindings.input2.consumer.bindingRoutingKey=foo-1

这将消耗由answer to your other question中的生产者创建的两个分区。

目前还没有一种让@StreamListener直接监听两个分区的方法。

编辑

这里有另一种方法,使用exchange->exchange绑定..。

生产者

代码语言:javascript
复制
@SpringBootApplication
@EnableBinding(Source.class)
public class So43614477Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(So43614477Application.class, args).close();
    }

    @Autowired
    private MessageChannel output;

    @Autowired
    private AmqpAdmin admin;

    @Value("${spring.cloud.stream.bindings.output.producer.partition-count}")
    private int partitionCount;

    @Value("${spring.cloud.stream.bindings.output.destination}")
    private String destination;

    @Override
    public void run(String... args) throws Exception {
        for (int i = 0; i < this.partitionCount; i++) {
            String partition = this.destination + "-" + i;
            TopicExchange exchange = new TopicExchange(partition);
            this.admin.declareExchange(exchange);
            Binding binding = BindingBuilder.bind(exchange).to(new TopicExchange(this.destination))
                    .with(partition);
            this.admin.declareBinding(binding);
        }

        output.send(MessageBuilder.withPayload("fiz").setHeader("whichPart", 0).build());
        output.send(MessageBuilder.withPayload("buz").setHeader("whichPart", 1).build());
    }

}

代码语言:javascript
复制
spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['whichPart']
spring.cloud.stream.bindings.output.producer.partition-count=2

Consumer

代码语言:javascript
复制
@SpringBootApplication
@EnableBinding(Sink.class)
public class So43661064Application {

    public static void main(String[] args) {
        SpringApplication.run(So43661064Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void foo1(String in) {
        System.out.println(in);
    }

}

代码语言:javascript
复制
spring.cloud.stream.bindings.input.group=bar
spring.cloud.stream.bindings.input.destination=foo-0,foo-1

主交换中的分区被路由到分区交换,并且使用者获得要绑定其队列的交换列表。

您可以将该列表传递到命令行。

票数 1
EN

Stack Overflow用户

发布于 2017-04-27 15:08:04

你为什么会认为这是浪费资源?如果您的需求要求您需要有状态的处理,并且您正在分割成多个分区,那么您将需要N个消费者的N个分区。

如果将不同分区的消息混合在同一个队列中,您的排序就会受到影响。除非您在您的端添加了一些逻辑来基于某些元数据来聚合消息。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/43661064

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档