我对春云流中的新路由特性有一些问题。
我试着实现一个简单的场景,我想用头spring.cloud.function.definition = consume1或consume2发送一条消息
我希望consume1或consume2应该根据消息头上发送的内容进行调用,但是方法是随机调用的。
我使用兔子管理控制台将消息发送给exchange使用者。
我有以下日志:
2020-02-27 14:48:25.896 INFO 22132 --- [ consumer.app-1] com.example.demo.TestConsumer : ==============>consume1 messge [[payload=ok, headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=#, amqp_receivedExchange=consumer, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=consumer.app, amqp_redelivered=false, id=9a4dff25-88ef-4d76-93e2-c8719cda122d, spring.cloud.function.definition=consume1, amqp_consumerTag=amq.ctag-gGChFNCKIVd25yyR9H6-fQ, sourceData=(Body:'[B@3a92faa7(byte[2])' MessageProperties [headers={spring.cloud.function.definition=consume1}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=consumer, receivedRoutingKey=#, deliveryTag=1, consumerTag=amq.ctag-gGChFNCKIVd25yyR9H6-fQ, consumerQueue=consumer.app]), timestamp=1582811303347}]]
2020-02-27 14:48:25.984 INFO 22132 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2020-02-27 14:48:25.984 INFO 22132 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2020-02-27 14:48:25.991 INFO 22132 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 7 ms
2020-02-27 14:48:26.037 INFO 22132 --- [oundedElastic-1] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel customer-1
2020-02-27 14:48:26.111 INFO 22132 --- [oundedElastic-1] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application.customer-1' has 1 subscriber(s).
2020-02-27 14:48:26.116 INFO 22132 --- [oundedElastic-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2020-02-27 14:48:26.123 INFO 22132 --- [oundedElastic-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#32438e24:0/SimpleConnection@3e58666d [delegate=amqp://guest@127.0.0.1:5672/, localPort= 62514]
2020-02-27 14:48:26.139 INFO 22132 --- [-1.customer-1-1] o.s.i.h.s.MessagingMethodInvokerHelper : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-02-27 14:48:26.140 INFO 22132 --- [-1.customer-1-1] com.example.demo.TestSink : Data received customer-1...body
2020-02-27 14:49:14.185 INFO 22132 --- [ consumer.app-1] o.s.i.h.s.MessagingMethodInvokerHelper : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-02-27 14:49:14.194 INFO 22132 --- [ consumer.app-1] com.example.demo.TestConsumer : ==============>consume2 messge [[payload=ok, headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=#, amqp_receivedExchange=consumer, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=consumer.app, amqp_redelivered=false, id=33581edb-2832-1c92-b765-a05794512b34, spring.cloud.function.definition=consume1, amqp_consumerTag=amq.ctag-RIp2nZdcG2a0hNQeImwtBw, sourceData=(Body:'[B@8159793(byte[2])' MessageProperties [headers={spring.cloud.function.definition=consume1}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=consumer, receivedRoutingKey=#, deliveryTag=1, consumerTag=amq.ctag-RIp2nZdcG2a0hNQeImwtBw, consumerQueue=consumer.app]), timestamp=1582811354186}]]
2020-02-27 14:49:14.203 INFO 22132 --- [oundedElastic-1] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel customer-2
2020-02-27 14:49:14.213 INFO 22132 --- [oundedElastic-1] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application.customer-2' has 1 subscriber(s).
2020-02-27 14:49:14.216 INFO 22132 --- [-2.customer-2-1] o.s.i.h.s.MessagingMethodInvokerHelper : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-02-27 14:49:14.216 INFO 22132 --- [-2.customer-2-1] com.example.demo.TestSink : Data received customer-2...bodyapplication.yml
spring:
main:
allow-bean-definition-overriding: true
spring.cloud.stream:
function.definition: supplier;receive1;receive2;consume1;consume2
function.routing:
enabled: true
bindings:
consume1-in-0.destination: consumer
consume1-in-0.group: app
consume2-in-0.destination: consumer
consume2-in-0.group: app
receive1-in-0.destination: customer-1
receive1-in-0.group: customer-1
receive2-in-0.destination: customer-2
receive2-in-0.group: customer-2DemoApplication.java
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.commons.logging.Log
import org.apache.commons.logging.LogFactory
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.http.HttpStatus
import org.springframework.messaging.Message
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestMethod.GET
import org.springframework.web.bind.annotation.ResponseStatus
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.client.RestTemplate
import reactor.core.publisher.EmitterProcessor
import reactor.core.publisher.Flux
import java.util.function.Consumer
import java.util.function.Supplier
@SpringBootApplication
class DemoApplication
fun main(args: Array<String>) {
runApplication<DemoApplication>(*args)
}
@RestController
class DynamicDestinationController(private val jsonMapper: ObjectMapper) {
private val processor: EmitterProcessor<Message<String>> = EmitterProcessor.create<Message<String>>()
@RequestMapping(path = ["/api/dest/{destName}"], method = [GET], consumes = ["*/*"])
@ResponseStatus(HttpStatus.ACCEPTED)
fun handleRequest(@PathVariable destName:String) {
val message: Message<String> = MessageBuilder.withPayload("body")
.setHeader("spring.cloud.stream.sendto.destination", destName).build()
processor.onNext(message)
}
@Bean
fun supplier(): Supplier<Flux<Message<String>>> {
return Supplier { processor }
}
}
const val destResourceUrl = "http://localhost:8080/api/dest"
@Component
class TestConsumer() {
private val restTemplate: RestTemplate = RestTemplate()
private val logger: Log = LogFactory.getLog(javaClass)
@Bean
fun consume1(): Consumer<Message<String>> = Consumer {
logger.info("==============>consume1 messge [[payload=${it.payload}, headers=${it.headers}]]")
restTemplate.getForEntity("$destResourceUrl/customer-1", String::class.java)
}
@Bean
fun consume2(): Consumer<Message<String>> = Consumer {
logger.info("==============>consume2 messge [[payload=${it.payload}, headers=${it.headers}]]")
restTemplate.getForEntity("$destResourceUrl/customer-2", String::class.java)
}
}
@Component
class TestSink {
private val logger: Log = LogFactory.getLog(javaClass)
@Bean
fun receive1(): Consumer<String> = Consumer {
logger.info("Data received customer-1..." + it);
}
@Bean
fun receive2(): Consumer<String> = Consumer {
logger.info("Data received customer-2..." + it);
}
}知道如何修复通往消费者的路线吗?
提前谢谢。
发布于 2020-02-27 18:30:49
实际上,我有点困惑,所以让我们现在做一个步骤。下面是功能(模仿您的)应用程序,它使用斯本托功能,允许您将消息发送到特定(现有和/或动态解析的)目的地。
(在java中,您可以将其重做到Kotlin)
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class,
"--spring.cloud.function.definition=supplier;consA;consB",
"--spring.cloud.stream.bindings.consA-in-0.destination=consumerA",
"--spring.cloud.stream.bindings.consA-in-0.group=consumerA-grp",
"--spring.cloud.stream.bindings.consB-in-0.destination=consumerB",
"--spring.cloud.stream.bindings.consB-in-0.group=consumerB-grp"
);
}
EmitterProcessor<Message<String>> processor = EmitterProcessor.create();
@RequestMapping(path = "/api/dest/{destName}", consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body, @PathVariable String destName) {
Message<String> message = MessageBuilder.withPayload(body)
.setHeader("spring.cloud.stream.sendto.destination", destName)
.build();
processor.onNext(message);
}
@Bean
public Supplier<Flux<Message<String>>> supplier() {
return () -> processor;
}
@Bean
public Consumer<String> consA() {
return v -> {
System.out.println("Consuming from consA: " + v);
};
}
@Bean
public Consumer<String> consB() {
return v -> {
System.out.println("Consuming from consB: " + v);
};
}
}当我卷起它时,就会得到一致的调用pr,即适当的使用者:
curl -H "Content-Type: application/json" -X POST -d "Hello Spring Cloud Stream" http://localhost:8080/api/dest/consumerA
log: Consuming from consA: Hello Spring Cloud Stream
. . .
curl -H "Content-Type: application/json" -X POST -d "Hello Spring Cloud Stream" http://localhost:8080/api/dest/consumerB
log: Consuming from consB: Hello Spring Cloud Stream注意:没有启用路由属性。该功能的主要目的是始终调用一个函数functionRouter,并让它代表您调用其他函数。它是spring云功能的一个特性,这意味着它在和通道/目的地之外工作。
这不是你想要完成的吗?根据HTTP请求中的某个宣誓变量向不同的目的地发送消息?
发布于 2020-02-27 18:59:26
下面是一个不同的微服务的例子,它接收路由功能,然后路由到不同的功能。
public class FunctionRoutingApplication {
public static void main(String[] args) {
SpringApplication.run(FunctionRoutingApplication.class,
"--spring.cloud.stream.function.routing.enabled=true"
);
}
@Bean
public Consumer<String> consA() {
return v -> {
System.out.println("Consuming from consA: " + v);
};
}
@Bean
public Consumer<String> consB() {
return v -> {
System.out.println("Consuming from consB: " + v);
};
}
}差不多就是这样了。转到代理并将数据发送到functionRouter-in-0 exchange,同时提供spring.cloud.function.definition=consA/consB头,您将看到一致的调用。
我还漏掉了什么吗?
https://stackoverflow.com/questions/60434680
复制相似问题