首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用RestTemplate在qpid中创建队列?

如何使用RestTemplate在qpid中创建队列?
EN

Stack Overflow用户
提问于 2019-08-06 21:50:28
回答 2查看 486关注 0票数 0

我正在尝试为使用RabbitMQ的应用程序编写集成测试,为此,我使用Qpid broker。我设法启动了服务器,并且我的测试正在连接到它,但是在启动之前,我需要在Qpid中创建队列。因为我有大量的队列,所以我动态地创建bean:

代码语言:javascript
复制
applicationContext.getBeanFactory().registerSingleton(queueName, queue);

这需要在启动之前创建队列。

这是qpid配置文件:

代码语言:javascript
复制
{
  "name": "tst",
  "modelVersion": "2.0",
  "defaultVirtualHost" : "default",
  "authenticationproviders" : [ {
    "name" : "noPassword",
    "type" : "Anonymous",
    "secureOnlyMechanisms": []
        },
    {
      "name" : "passwordFile",
      "type" : "PlainPasswordFile",
      "path" : "/src/test/resources/passwd.txt",
      "secureOnlyMechanisms": [],
      "preferencesproviders" : [{
        "name": "fileSystemPreferences",
        "type": "FileSystemPreferences",
        "path" : "${qpid.work_dir}${file.separator}user.preferences.json"
        }
      ]
    }
   ],
  "ports" : [
    {
      "name": "AMQP",
      "port": "5673",
      "authenticationProvider": "passwordFile",
      "protocols": [
        "AMQP_0_10",
        "AMQP_0_8",
        "AMQP_0_9",
        "AMQP_0_9_1"
      ]
    }],
  "virtualhostnodes" : [ {
    "name" : "default",
    "type" : "JSON",
    "virtualHostInitialConfiguration" : "{ \"type\" : \"Memory\" }"
  }]

}

从官方文档(https://qpid.apache.org/releases/qpid-broker-j-7.1.4/book/Java-Broker-Management-Channel-REST-API.html#d0e2130)中我读到可以创建队列来REST调用,所以我尝试使用RestTemplate来实现这一点,但它似乎不能创建队列。

代码语言:javascript
复制
    @BeforeClass
    public static void startup() throws Exception {
        brokerStarter = new BrokerManager();
        brokerStarter.startBroker();

        RestTemplate restTemplate = new RestTemplate();
        restTemplate.put("http://localhost:5673/api/latest/queue/default/queue1", "");
        restTemplate.put("http://localhost:5673/api/latest/queue/default/queue-2", "");
    }

有人能解释一下我做错了什么吗?谢谢!

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-08-07 17:16:17

我设法通过使用连接工厂解决了这个问题:

代码语言:javascript
复制
            @Autowired
            ConnectionFactory factory;

            ....
            factory.setHost("localhost");
            factory.setPort(qpid_server_port);
            try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
                String queue = "queue-x";
                channel.queueDeclare(queue, true, false, false, null);
                //channel.queueBind(queue, "exchange-x" , "routing-key-x");

            } catch (Exception e) {
                e.printStackTrace();
            }
票数 0
EN

Stack Overflow用户

发布于 2020-07-28 22:45:27

我使用REST API解决了同样的问题。为了创建/删除用于集成测试的队列,我使用以下配置文件(qpid-config.json):

代码语言:javascript
复制
{
  "name": "EmbeddedBroker",
  "modelVersion": "8.0",
  "authenticationproviders": [
    {
      "name": "anonymous",
      "type": "Anonymous"
    }
  ],
  "ports": [
    {
      "name": "AMQP",
      "bindingAddress": "localhost",
      "port": "${qpid.amqp_port}",
      "protocols": [ "AMQP_1_0" ],
      "authenticationProvider": "anonymous",
      "virtualhostaliases" : [ {
        "name" : "nameAlias",
        "type" : "nameAlias"
      }, {
        "name" : "defaultAlias",
        "type" : "defaultAlias"
      }, {
        "name" : "hostnameAlias",
        "type" : "hostnameAlias"
      } ]
    },
    {
      "name" : "HTTP",
      "port" : "${qpid.http_port}",
      "protocols" : [ "HTTP" ],
      "authenticationProvider" : "anonymous"
    }
  ],
  "virtualhostnodes": [
    {
      "name": "default",
      "defaultVirtualHostNode": "true",
      "type": "Memory",
      "virtualHostInitialConfiguration": "{\"type\": \"Memory\" }"
    }
  ],
  "plugins" : [
    {
      "type" : "MANAGEMENT-HTTP",
      "name" : "httpManagement"
    }
  ]
}

相关的Gradle依赖关系:

代码语言:javascript
复制
    testImplementation("org.apache.qpid:qpid-broker-core:${Versions.qpidBroker}") // tested with 8.0.0
    testImplementation("org.apache.qpid:qpid-broker-plugins-amqp-1-0-protocol:${Versions.qpidBroker}")
    testImplementation("org.apache.qpid:qpid-broker-plugins-memory-store:${Versions.qpidBroker}")
    testImplementation("org.apache.qpid:qpid-broker-plugins-management-http:${Versions.qpidBroker}")

    testImplementation("org.springframework.boot:spring-boot-starter-webflux")
    testImplementation("org.projectreactor:reactor-spring:${Versions.reactorSpring}")

启动代理的代码(Kotlin):

代码语言:javascript
复制
    private fun startQpidBroker() {
        val attributes: MutableMap<String, Any> = HashMap()
        val initialConfig = EmbeddedAMQPBroker::class.java.classLoader.getResource("qpid-config.json")!!
        attributes["type"] = "Memory"
        attributes["initialConfigurationLocation"] = initialConfig.toExternalForm()
        attributes["startupLoggedToSystemOut"] = true
        System.setProperty("qpid.amqp_port", "$amqpPort")
        System.setProperty("qpid.http_port", "$httpPort")
        // needed to avoid "AMQP precondition failed" due to durable message being sent to non-durable queues
        System.setProperty("qpid.tests.mms.messagestore.persistence", "true")
        broker.startup(attributes)
    }

删除/创建队列的代码:

代码语言:javascript
复制
    private fun recreateQueue(queueName: String) {
        val client = WebClient.create("http://localhost:${EmbeddedAMQPBroker.httpPort}");
        try {
            client.method(HttpMethod.DELETE)
                    .uri("/api/latest/queue/default/$queueName")
                    .retrieve()
                    .toBodilessEntity()
                    .block()
                    .statusCode
        } catch (e: WebClientResponseException) {
            if (e.statusCode != HttpStatus.NOT_FOUND) { // queue might not yet exist so 404 is acceptable
                throw e
            }
        }

        client.method(HttpMethod.PUT)
                .uri("/api/latest/queue/default/default/$queueName")
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .body(BodyInserters.fromValue(mapOf("name" to queueName, "type" to "standard")))
                .retrieve()
                .toBodilessEntity()
                .block()
                .statusCode
    }
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57377624

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档