我尝试使用ApachePythonArtemis2.24发送和接收消息,使用ActiveMQ使用STOMP发送消息,并使用接收消息。Stomp.py版本为8.0.1。Python版本为3.10.4。Java版本是1.8.0_342。
如果我运行基于Java的消息使用者并使用Java代码发送消息,那么一切都正常。但是,如果我使用Stomp.py向队列发送消息,则在接收端得到以下异常:
Exception in thread "main" java.lang.IndexOutOfBoundsException: Error reading in simpleString, length=1864388729 is greater than readableBytes=10
at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:183)
at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:171)
at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readStringInternal(ChannelBufferWrapper.java:103)
at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readString(ChannelBufferWrapper.java:88)
at org.fogbeam.experimental.bosworth.activemq.ActiveMQConsumerMain.main(ActiveMQConsumerMain.java:54)消息使用者如下所示:
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
public class ActiveMQConsumerMain
{
public static void main(String[] args) throws Exception
{
ServerLocator locator = ActiveMQClient.createServerLocator("tcp://172.16.1.141:61616");
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession( "username", "password", false, true, true, false, 4096 );
// We need a queue attached to the address ...
try
{
session.createQueue("example", RoutingType.ANYCAST, "example", true);
}
catch( ActiveMQQueueExistsException amqe )
{
if( amqe.getMessage().contains("already exists" ))
{
// no problem, our queue already exists
System.out.println( "Queue already exists on server!" );
}
else
{
amqe.printStackTrace();
}
}
// And a consumer attached to the queue ...
ClientConsumer consumer = session.createConsumer("example");
session.start();
while( true )
{
System.out.println( "Listening..." );
ClientMessage msgReceived = consumer.receive();
System.out.println("message = " + msgReceived.getBodyBuffer().readString());
msgReceived.acknowledge();
session.commit();
}
}
}工作的Java消息生成程序如下所示:
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
public class ActiveMQProducerMain
{
public static void main(String[] args) throws Exception
{
ServerLocator locator = ActiveMQClient.createServerLocator("tcp://172.16.1.141:61616");
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession( "username", "password", false, true, true, false, 4096 );
ClientProducer producer = session.createProducer("example");
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("Hello world!!!");
// We need a queue attached to the address ...
try
{
session.createQueue("example", RoutingType.ANYCAST, "example", true);
}
catch( ActiveMQQueueExistsException amqe )
{
if( amqe.getMessage().contains("already exists" ))
{
// no problem, out queue already exists
System.out.println( "Queue already exists on server!" );
}
else
{
amqe.printStackTrace();
}
}
// Once we have a queue, we can send the message ...
producer.send(message);
}
}Python消息生产者代码如下所示:
import time
import stomp
def main():
print( "Sending ActiveMQ message using STOMP client!\n" )
conn = stomp.Connection( [('172.16.1.141', 61613)] )
conn.connect( wait=True, headers={'consumerWindowSize': 0})
conn.send(body='Hello Python World', destination='example')
time.sleep(5)
conn.disconnect()
exit()
if __name__ == "__main__":
main()编辑1:
尝试了其他的东西--我设置了一个Stomp.py消息使用者,当我运行它并使用Stomp.py发送消息时,一切都很好。因此,Java和Python客户端库似乎从根本上都是“工作”的,但是两者的交集(例如,从Python发送,用Java接收)的一些东西正在崩溃。
编辑2:
另外,如果我运行基于Python的消息使用者,并使用Java消息生成器发送消息,Python代码将收到一条消息,但是消息的内容似乎是空的。因此,看起来在"Java土地“中发生的事情和在"Python陆地”中发生的事情之间有一些奇怪的不匹配。
对是什么原因有什么想法吗?
编辑3:
下面我尝试根据贾斯汀的答案切换到readNullableSimpleString(),所以现在我的代码如下所示:
System.out.println("message = " + msgReceived.getBodyBuffer().readNullableSimpleString() );现在我明白了:
java.lang.IndexOutOfBoundsException: Error reading in simpleString, length=1701604463 is greater than readableBytes=13
at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:183)
at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:171)
at org.apache.activemq.artemis.api.core.SimpleString.readNullableSimpleString(SimpleString.java:158)
at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readNullableSimpleString(ChannelBufferWrapper.java:69)
at org.fogbeam.experimental.bosworth.activemq.ActiveMQConsumerMain.main(ActiveMQConsumerMain.java:64)看起来,对readNullableSimpleString的调用最终仍会触发对readSimpleString()的调用,该调用将出错。
编辑4:
我仍然不明白“为什么”到底发生了什么,但我确实找到了在消费者中成功获取我的信息的方法。这一机制起着以下作用:
int bodySize = msgReceived.getBodySize();
byte[] bytes = new byte[bodySize];
msgReceived.getBodyBuffer().readBytes( bytes );
System.out.println("message = " + new String( bytes ) );看起来,STOMP消息正在被转换成BytesMessage而不是引擎盖下的TextMessage?或者接近那个的东西。而我发现的一些较早的讨论表明,这反过来又与内容长度标题的存在或不存在有关。真奇怪。但现在我很高兴我至少有一条前进的道路,即使这不是一条完美的前进之路。
发布于 2022-09-18 01:21:38
当代理接收到没有content-length头的STOMP消息时,它使用writeNullableSimpleString方法将主体编码为可为空的org.apache.activemq.artemis.api.core.ActiveMQBuffer。因此,当您将该消息作为核心消息接收时,需要使用SimpleString上的readNullableSimpleString方法将该数据作为可空的org.apache.activemq.artemis.api.core.ActiveMQBuffer读取。这在文献资料中得到了概述。
这与您的核心生产者和核心消费者一起工作的原因是他们分别使用writeString和readString。
同样,它不适用于核心生产者和Python使用者,因为核心生产者正在使用writeString,并且消息正在被转换为使用readNullableSimpleString的STOMP MESSAGE框架,当然,这是不起作用的。
需要注意的是,如何从ActiveMQBuffer中获取ClientMessage很重要,因为我们在缓冲区内部跟踪索引的方式很重要。例如,JavaDoc on ClientMessage#getBodyBuffer声明:
写主体的缓冲器。 警告:如果您只想读取消息的内容,请使用getDataBuffer()或getReadOnlyBuffer();
因此,当您使用readNullableSimpleString时,您将希望通过使用msgReceived.getDataBuffer().readNullableSimpleString()或msgReceived.getReadOnlyBuffer().readNullableSimpleString()来实现
最后,请记住,消息的主体最终只是一个字节数组。代理无法知道数组中的数据类型。它可以是人类可读的文本,也可以是二进制数据,即使是文本,也可以通过多种方式进行编码。为了使客户端能够相互交换消息,它们必须使用一种通用格式。在这种情况下,使用核心API的Java客户端必须使用可空的SimpleString。
发布于 2022-09-18 03:32:17
好吧,我想我终于明白这是怎么回事了。有几个不同的问题在起作用,它们之间的相互作用在早期就令人困惑。
我目前的理解是:
因此,当没有内容长度的标题时,这将成功地读取消息内容。
ActiveMQBuffer tempBuff = msgReceived.getDataBuffer();
System.out.println( "tempBuff: " + tempBuff.readNullableSimpleString() );最后,我的问题是:
有了这种理解,我现在就可以在Python端设置内容长度标头的情况下读取Java端的消息。
https://stackoverflow.com/questions/73759294
复制相似问题