我创建了一个新的均衡器,并试图向eventHubA发布消息。当我试图发送消息时,我得到以下错误:
java.lang.IllegalStateException: namespacexxxxx entityPathxxxxx:无法订阅。处理器已在com.azure.core.amqp.implementation.AmqpChannelProcessor.subscribe(AmqpChannelProcessor.java:217)处终止
下面是我正在使用的代码片段:
public void send(Response response) {
String responseInString = JsonHandlingUtil.objectToJsonString(response);
EventData eventData = new EventData(responseInString);
// create a batch
EventDataBatch eventDataBatch = producer.createBatch();
// try to add the event from the array to the batch
if (!eventDataBatch.tryAdd(eventData)) {
// if the batch is full, send it and then create a new batch
producer.send(eventDataBatch);
eventDataBatch = producer.createBatch();
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(eventData)) {
throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
+ eventDataBatch.getMaxSizeInBytes());
}
}
// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
producer.send(eventDataBatch);
}
producer.close();
}我已经将eventhubProducerClient定义为Bean。
@Bean
public EventHubProducerClient eventHubProducerClient() {
return new EventHubClientBuilder()
.transportType(AmqpTransportType.AMQP_WEB_SOCKETS)
.connectionString(connectionString, eventHubName)
.buildProducerClient();
}下面是我的gradle依赖项
> //eventhub
> implementation 'com.azure:azure-messaging-eventhubs:5.7.0'
> implementation group: 'io.projectreactor', name: 'reactor-core', version: '3.4.6'发布于 2021-07-12 14:01:17
从follow-up问题来看,似乎根本原因已经被确认为send方法中的producer.close()调用。
由于看起来他的生产者是由应用程序作为一个单例来管理的,所以在事件不再发布的时候,例如当应用程序关闭时,可以调用close。
https://stackoverflow.com/questions/68297419
复制相似问题