首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用SmallRye动态响应消息发布/订阅MQTT

使用SmallRye动态响应消息发布/订阅MQTT
EN

Stack Overflow用户
提问于 2020-07-13 20:09:44
回答 1查看 1K关注 0票数 3

我们尝试使用小黑眼反应性消息发布和订阅MQTT协议。通过以下简单代码,我们成功地将消息实际发布到特定的主题/通道

代码语言:javascript
复制
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;

@ApplicationScoped
public class Publish {
    
    @Outgoing("pao")
    public Multi<String> generate() {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> "A Message in here");
    }
}

我们想要做的是在任何时候使用动态主题调用generate()方法,用户将在其中定义它。这是我们的问题,但是我们从github中的存储库中找到了这些类。包名io.smallrye.reactive.messaging.mqtt

例如,我们发现有一个类表示它对MQTT代理(Mosquitto server up)进行了发布调用。

在这个声明中,我们得到了在'io.smallrye.reactive.messaging.mqtt.SendingMqttMessage'.中不公开的SendingMqttMessage<String>下面的红色下划线:'SendingMqttMessage(java.lang.String,java.lang.String,io.netty.handler.codec.mqtt.MqttQoS,boolean)‘无法从外部包访问

更新(发布完成)最终向mqtt (一台蚊虫服务器)发出了发布请求,所有这些都带有用户配置的动态主题。正如我们所发现的,以前的类SendingMqttMessage根本不应该被使用。我们发现,我们还需要和发射器实际提出一个动态主题的发布请求。

代码语言:javascript
复制
    @Inject
    @Channel("panatha")
    Emitter<String> emitter;

    @POST
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Response createUser(Device device) {
        System.out.println("New Publish request: message->"+device.getMessage()+" & topic->"+device.getTopic());
        emitter.send(MqttMessage.of(device.getTopic(), device.getMessage()));
        return Response.ok().status(Response.Status.CREATED).build();
    }

现在,我们需要了解如何动态订阅某个主题。

EN

回答 1

Stack Overflow用户

发布于 2022-01-10 12:55:03

首先要把我们引向同一页:

反应性消息传递使不与主题一起工作,而与通道一起工作。这一点很重要,因为您可以将或写入通道。因此,如果要同时提供这两种功能,则需要配置指向同一主题的两个通道,一个是传入的,另一个是传出的。

回答你的问题:

你在排放器上做了一个很好的开始,但是你仍然缺乏你想要的动态特性。在您的例子中,您通过CDI获得了发射器。

现在,我们所需要的就是这样的动态,因为我们可以在运行时使用CDI动态注入Beans,如下所示:

发送消息

代码语言:javascript
复制
private Emitter<byte[]> dynamicEmitter(String topic){
        return CDI.current().select(new TypeLiteral<Emitter<byte[]>>() {}, new ChannelAnnotation(topic)).get();
    }

还请注意,我正在创建一个类型为byte[]的发射器,因为根据它的文档,这是小黑麦-mqtt连接器(3.4.0版)目前唯一的支持类型。

接收讯息

要从反应性消息通道读取消息,可以使用发射器的对应项,即Publisher。

它可以用来模拟:

代码语言:javascript
复制
private Publisher<byte[]> dynamicReceiver(String topic){
        return CDI.current().select(new TypeLiteral<Publisher<byte[]>>() {}, new ChannelAnnotation(topic)).get();
    }

然后,您可以以任何您喜欢的方式处理这些日期。作为演示,它将它挂在一个简单的REST端点上

代码语言:javascript
复制
@GET
    @Produces(MediaType.SERVER_SENT_EVENTS) 
    public Multi<String> stream(@QueryParam("topic") String topic) {
        return Multi.createFrom().publisher(dynamicReceiver(topic)).onItem().transform(String::new); 
    }
    
    @GET
    @Path("/publish")
    public boolean publish(@QueryParam("msg") String msg, @QueryParam("topic") String topic) {
        dynamicEmitter(topic).send(msg.getBytes());
        return true; 
    }

还有一件事

在创建此解决方案时,我遇到了一些您应该知道的问题:

  1. Quarkus删除所有“未使用”的CDI。因此,如果您想动态地注入它们,则需要排除这些特性,或者关闭该特性。
  2. 以这种方式注入的所有通道都必须配置。否则注入就会失败。
  3. 由于某些原因(即使是完全禁用的清除),我无法动态地注入发射器,除非它们被注入到其他地方。
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62883516

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档