我们尝试使用小黑眼反应性消息发布和订阅MQTT协议。通过以下简单代码,我们成功地将消息实际发布到特定的主题/通道
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
@ApplicationScoped
public class Publish {
@Outgoing("pao")
public Multi<String> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> "A Message in here");
}
}我们想要做的是在任何时候使用动态主题调用generate()方法,用户将在其中定义它。这是我们的问题,但是我们从github中的存储库中找到了这些类。包名io.smallrye.reactive.messaging.mqtt
例如,我们发现有一个类表示它对MQTT代理(Mosquitto server up)进行了发布调用。
在这个声明中,我们得到了在'io.smallrye.reactive.messaging.mqtt.SendingMqttMessage'.中不公开的SendingMqttMessage<String>下面的红色下划线:'SendingMqttMessage(java.lang.String,java.lang.String,io.netty.handler.codec.mqtt.MqttQoS,boolean)‘无法从外部包访问
更新(发布完成)最终向mqtt (一台蚊虫服务器)发出了发布请求,所有这些都带有用户配置的动态主题。正如我们所发现的,以前的类SendingMqttMessage根本不应该被使用。我们发现,我们还需要和发射器实际提出一个动态主题的发布请求。
@Inject
@Channel("panatha")
Emitter<String> emitter;
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response createUser(Device device) {
System.out.println("New Publish request: message->"+device.getMessage()+" & topic->"+device.getTopic());
emitter.send(MqttMessage.of(device.getTopic(), device.getMessage()));
return Response.ok().status(Response.Status.CREATED).build();
}现在,我们需要了解如何动态订阅某个主题。
发布于 2022-01-10 12:55:03
首先要把我们引向同一页:
反应性消息传递使不与主题一起工作,而与通道一起工作。这一点很重要,因为您可以将或写入通道。因此,如果要同时提供这两种功能,则需要配置指向同一主题的两个通道,一个是传入的,另一个是传出的。
回答你的问题:
你在排放器上做了一个很好的开始,但是你仍然缺乏你想要的动态特性。在您的例子中,您通过CDI获得了发射器。
现在,我们所需要的就是这样的动态,因为我们可以在运行时使用CDI动态注入Beans,如下所示:
发送消息
private Emitter<byte[]> dynamicEmitter(String topic){
return CDI.current().select(new TypeLiteral<Emitter<byte[]>>() {}, new ChannelAnnotation(topic)).get();
}还请注意,我正在创建一个类型为byte[]的发射器,因为根据它的文档,这是小黑麦-mqtt连接器(3.4.0版)目前唯一的支持类型。
接收讯息
要从反应性消息通道读取消息,可以使用发射器的对应项,即Publisher。
它可以用来模拟:
private Publisher<byte[]> dynamicReceiver(String topic){
return CDI.current().select(new TypeLiteral<Publisher<byte[]>>() {}, new ChannelAnnotation(topic)).get();
}然后,您可以以任何您喜欢的方式处理这些日期。作为演示,它将它挂在一个简单的REST端点上
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream(@QueryParam("topic") String topic) {
return Multi.createFrom().publisher(dynamicReceiver(topic)).onItem().transform(String::new);
}
@GET
@Path("/publish")
public boolean publish(@QueryParam("msg") String msg, @QueryParam("topic") String topic) {
dynamicEmitter(topic).send(msg.getBytes());
return true;
}还有一件事
在创建此解决方案时,我遇到了一些您应该知道的问题:
https://stackoverflow.com/questions/62883516
复制相似问题