首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >与春云流及功能连接的路径

与春云流及功能连接的路径
EN

Stack Overflow用户
提问于 2020-02-27 13:45:35
回答 2查看 2.1K关注 0票数 0

我对春云流中的新路由特性有一些问题。

我试着实现一个简单的场景,我想用头spring.cloud.function.definition = consume1或consume2发送一条消息

我希望consume1或consume2应该根据消息头上发送的内容进行调用,但是方法是随机调用的。

我使用兔子管理控制台将消息发送给exchange使用者。

我有以下日志:

代码语言:javascript
复制
2020-02-27 14:48:25.896  INFO 22132 --- [ consumer.app-1] com.example.demo.TestConsumer            : ==============>consume1 messge [[payload=ok, headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=#, amqp_receivedExchange=consumer, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=consumer.app, amqp_redelivered=false, id=9a4dff25-88ef-4d76-93e2-c8719cda122d, spring.cloud.function.definition=consume1, amqp_consumerTag=amq.ctag-gGChFNCKIVd25yyR9H6-fQ, sourceData=(Body:'[B@3a92faa7(byte[2])' MessageProperties [headers={spring.cloud.function.definition=consume1}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=consumer, receivedRoutingKey=#, deliveryTag=1, consumerTag=amq.ctag-gGChFNCKIVd25yyR9H6-fQ, consumerQueue=consumer.app]), timestamp=1582811303347}]]
2020-02-27 14:48:25.984  INFO 22132 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2020-02-27 14:48:25.984  INFO 22132 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2020-02-27 14:48:25.991  INFO 22132 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 7 ms
2020-02-27 14:48:26.037  INFO 22132 --- [oundedElastic-1] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel customer-1
2020-02-27 14:48:26.111  INFO 22132 --- [oundedElastic-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.customer-1' has 1 subscriber(s).
2020-02-27 14:48:26.116  INFO 22132 --- [oundedElastic-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2020-02-27 14:48:26.123  INFO 22132 --- [oundedElastic-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#32438e24:0/SimpleConnection@3e58666d [delegate=amqp://guest@127.0.0.1:5672/, localPort= 62514]
2020-02-27 14:48:26.139  INFO 22132 --- [-1.customer-1-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-02-27 14:48:26.140  INFO 22132 --- [-1.customer-1-1] com.example.demo.TestSink                : Data received customer-1...body
2020-02-27 14:49:14.185  INFO 22132 --- [ consumer.app-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-02-27 14:49:14.194  INFO 22132 --- [ consumer.app-1] com.example.demo.TestConsumer            : ==============>consume2 messge [[payload=ok, headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=#, amqp_receivedExchange=consumer, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=consumer.app, amqp_redelivered=false, id=33581edb-2832-1c92-b765-a05794512b34, spring.cloud.function.definition=consume1, amqp_consumerTag=amq.ctag-RIp2nZdcG2a0hNQeImwtBw, sourceData=(Body:'[B@8159793(byte[2])' MessageProperties [headers={spring.cloud.function.definition=consume1}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=consumer, receivedRoutingKey=#, deliveryTag=1, consumerTag=amq.ctag-RIp2nZdcG2a0hNQeImwtBw, consumerQueue=consumer.app]), timestamp=1582811354186}]]
2020-02-27 14:49:14.203  INFO 22132 --- [oundedElastic-1] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel customer-2
2020-02-27 14:49:14.213  INFO 22132 --- [oundedElastic-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.customer-2' has 1 subscriber(s).
2020-02-27 14:49:14.216  INFO 22132 --- [-2.customer-2-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-02-27 14:49:14.216  INFO 22132 --- [-2.customer-2-1] com.example.demo.TestSink                : Data received customer-2...body

application.yml

代码语言:javascript
复制
spring:
  main:
    allow-bean-definition-overriding: true
spring.cloud.stream:
  function.definition: supplier;receive1;receive2;consume1;consume2
  function.routing:
    enabled: true

  bindings:
    consume1-in-0.destination: consumer
    consume1-in-0.group: app
    consume2-in-0.destination: consumer
    consume2-in-0.group: app
    receive1-in-0.destination: customer-1
    receive1-in-0.group: customer-1
    receive2-in-0.destination: customer-2
    receive2-in-0.group: customer-2

DemoApplication.java

代码语言:javascript
复制
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.commons.logging.Log
import org.apache.commons.logging.LogFactory
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.http.HttpStatus
import org.springframework.messaging.Message
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestMethod.GET
import org.springframework.web.bind.annotation.ResponseStatus
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.client.RestTemplate
import reactor.core.publisher.EmitterProcessor
import reactor.core.publisher.Flux
import java.util.function.Consumer
import java.util.function.Supplier


@SpringBootApplication
class DemoApplication

fun main(args: Array<String>) {
    runApplication<DemoApplication>(*args)
}

@RestController
class DynamicDestinationController(private val jsonMapper: ObjectMapper) {

    private val processor: EmitterProcessor<Message<String>> = EmitterProcessor.create<Message<String>>()

    @RequestMapping(path = ["/api/dest/{destName}"], method = [GET], consumes = ["*/*"])
    @ResponseStatus(HttpStatus.ACCEPTED)
    fun handleRequest(@PathVariable destName:String) {
        val message: Message<String> = MessageBuilder.withPayload("body")
                .setHeader("spring.cloud.stream.sendto.destination", destName).build()
        processor.onNext(message)
    }

    @Bean
    fun supplier(): Supplier<Flux<Message<String>>> {
        return Supplier { processor }
    }
}

const val destResourceUrl = "http://localhost:8080/api/dest"
@Component
class TestConsumer() {

    private val restTemplate: RestTemplate = RestTemplate()
    private val logger: Log = LogFactory.getLog(javaClass)

    @Bean
    fun consume1(): Consumer<Message<String>> = Consumer {
        logger.info("==============>consume1 messge [[payload=${it.payload}, headers=${it.headers}]]")
        restTemplate.getForEntity("$destResourceUrl/customer-1", String::class.java)
    }

    @Bean
    fun consume2(): Consumer<Message<String>> = Consumer {
        logger.info("==============>consume2 messge [[payload=${it.payload}, headers=${it.headers}]]")
        restTemplate.getForEntity("$destResourceUrl/customer-2", String::class.java)
    }
}


@Component
class TestSink {
    private val logger: Log = LogFactory.getLog(javaClass)
    @Bean
    fun receive1(): Consumer<String> = Consumer {
        logger.info("Data received customer-1..." + it);
    }

    @Bean
    fun receive2(): Consumer<String> = Consumer {
        logger.info("Data received customer-2..." + it);
    }
}

知道如何修复通往消费者的路线吗?

提前谢谢。

演示-回购

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-02-27 18:30:49

实际上,我有点困惑,所以让我们现在做一个步骤。下面是功能(模仿您的)应用程序,它使用斯本托功能,允许您将消息发送到特定(现有和/或动态解析的)目的地。

(在java中,您可以将其重做到Kotlin)

代码语言:javascript
复制
@Controller
public class WebSourceApplication {

    public static void main(String[] args) {
        SpringApplication.run(WebSourceApplication.class,
                "--spring.cloud.function.definition=supplier;consA;consB",
                "--spring.cloud.stream.bindings.consA-in-0.destination=consumerA",
                "--spring.cloud.stream.bindings.consA-in-0.group=consumerA-grp",
                "--spring.cloud.stream.bindings.consB-in-0.destination=consumerB",
                "--spring.cloud.stream.bindings.consB-in-0.group=consumerB-grp"
                );
    }

    EmitterProcessor<Message<String>> processor = EmitterProcessor.create();

    @RequestMapping(path = "/api/dest/{destName}", consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void delegateToSupplier(@RequestBody String body, @PathVariable String destName) {
        Message<String>  message = MessageBuilder.withPayload(body)
            .setHeader("spring.cloud.stream.sendto.destination", destName)
            .build();
        processor.onNext(message);
    }

    @Bean
    public Supplier<Flux<Message<String>>> supplier() {
        return () -> processor;
    }

    @Bean
    public Consumer<String> consA() {
        return v -> {
            System.out.println("Consuming from consA:  " + v);
        };
    }

    @Bean
    public Consumer<String> consB() {
        return v -> {
            System.out.println("Consuming from consB:  " + v);
        };
    }
}

当我卷起它时,就会得到一致的调用pr,即适当的使用者:

代码语言:javascript
复制
curl -H "Content-Type: application/json" -X POST -d "Hello Spring Cloud Stream" http://localhost:8080/api/dest/consumerA
log: Consuming from consA:  Hello Spring Cloud Stream
. . .

curl -H "Content-Type: application/json" -X POST -d "Hello Spring Cloud Stream" http://localhost:8080/api/dest/consumerB
log: Consuming from consB:  Hello Spring Cloud Stream

注意:没有启用路由属性。该功能的主要目的是始终调用一个函数functionRouter,并让它代表您调用其他函数。它是spring云功能的一个特性,这意味着它在和通道/目的地之外工作。

这不是你想要完成的吗?根据HTTP请求中的某个宣誓变量向不同的目的地发送消息?

票数 2
EN

Stack Overflow用户

发布于 2020-02-27 18:59:26

下面是一个不同的微服务的例子,它接收路由功能,然后路由到不同的功能。

代码语言:javascript
复制
public class FunctionRoutingApplication {

    public static void main(String[] args) {
        SpringApplication.run(FunctionRoutingApplication.class,
                "--spring.cloud.stream.function.routing.enabled=true"
                );
    }

    @Bean
    public Consumer<String> consA() {
        return v -> {
            System.out.println("Consuming from consA:  " + v);
        };
    }

    @Bean
    public Consumer<String> consB() {
        return v -> {
            System.out.println("Consuming from consB:  " + v);
        };
    }
}

差不多就是这样了。转到代理并将数据发送到functionRouter-in-0 exchange,同时提供spring.cloud.function.definition=consA/consB头,您将看到一致的调用。

我还漏掉了什么吗?

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60434680

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档