首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >接收“simpleString中的错误读取,length=[]大于readableBytes=[]”,使用Stomp.py向ActiveMQ发送消息

接收“simpleString中的错误读取,length=[]大于readableBytes=[]”,使用Stomp.py向ActiveMQ发送消息
EN

Stack Overflow用户
提问于 2022-09-18 00:06:27
回答 2查看 88关注 0票数 0

我尝试使用ApachePythonArtemis2.24发送和接收消息,使用ActiveMQ使用STOMP发送消息,并使用接收消息。Stomp.py版本为8.0.1。Python版本为3.10.4。Java版本是1.8.0_342。

如果我运行基于Java的消息使用者并使用Java代码发送消息,那么一切都正常。但是,如果我使用Stomp.py向队列发送消息,则在接收端得到以下异常:

代码语言:javascript
复制
Exception in thread "main" java.lang.IndexOutOfBoundsException: Error reading in simpleString, length=1864388729 is greater than readableBytes=10
    at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:183)
    at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:171)
    at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readStringInternal(ChannelBufferWrapper.java:103)
    at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readString(ChannelBufferWrapper.java:88)
    at org.fogbeam.experimental.bosworth.activemq.ActiveMQConsumerMain.main(ActiveMQConsumerMain.java:54)

消息使用者如下所示:

代码语言:javascript
复制
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;

public class ActiveMQConsumerMain 
{
    public static void main(String[] args) throws Exception
    {
        ServerLocator locator = ActiveMQClient.createServerLocator("tcp://172.16.1.141:61616");

        ClientSessionFactory factory =  locator.createSessionFactory();
        ClientSession session = factory.createSession( "username", "password", false, true, true, false, 4096 );

        // We need a queue attached to the address ...

        try
        {
            session.createQueue("example", RoutingType.ANYCAST, "example", true);
        }
        catch( ActiveMQQueueExistsException amqe )
        {
            if( amqe.getMessage().contains("already exists" ))
            {
                // no problem, our queue already exists
                System.out.println( "Queue already exists on server!" );
            }
            else
            {
                amqe.printStackTrace();
            }
        }
        
        // And a consumer attached to the queue ...

        ClientConsumer consumer =  session.createConsumer("example");

        session.start();
        
        while( true )
        {
            System.out.println( "Listening..." );
            ClientMessage msgReceived = consumer.receive();

            System.out.println("message = " + msgReceived.getBodyBuffer().readString());
            
            msgReceived.acknowledge();
        
            session.commit();
        }
    }
}

工作的Java消息生成程序如下所示:

代码语言:javascript
复制
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;

public class ActiveMQProducerMain 
{
    public static void main(String[] args) throws Exception
    {
        ServerLocator locator = ActiveMQClient.createServerLocator("tcp://172.16.1.141:61616");

        ClientSessionFactory factory =  locator.createSessionFactory();
        ClientSession session = factory.createSession( "username", "password", false, true, true, false, 4096 );
        
        ClientProducer producer = session.createProducer("example");
        ClientMessage message = session.createMessage(true);
        message.getBodyBuffer().writeString("Hello world!!!");

        // We need a queue attached to the address ...

        try
        {
            session.createQueue("example", RoutingType.ANYCAST, "example", true);
        }
        catch( ActiveMQQueueExistsException amqe )
        {
            if( amqe.getMessage().contains("already exists" ))
            {
                // no problem, out queue already exists
                System.out.println( "Queue already exists on server!" );
            }
            else
            {
                amqe.printStackTrace();
            }
        }

        // Once we have a queue, we can send the message ...

        producer.send(message); 
    }

}

Python消息生产者代码如下所示:

代码语言:javascript
复制
import time
import stomp

def main():
    print( "Sending ActiveMQ message using STOMP client!\n" )
    
    conn = stomp.Connection( [('172.16.1.141', 61613)] )
    
    conn.connect( wait=True, headers={'consumerWindowSize': 0})
    
    conn.send(body='Hello Python World', destination='example')
    time.sleep(5)
    conn.disconnect()
    
    exit()
    
if __name__ == "__main__":
    main()

编辑1:

尝试了其他的东西--我设置了一个Stomp.py消息使用者,当我运行它并使用Stomp.py发送消息时,一切都很好。因此,Java和Python客户端库似乎从根本上都是“工作”的,但是两者的交集(例如,从Python发送,用Java接收)的一些东西正在崩溃。

编辑2:

另外,如果我运行基于Python的消息使用者,并使用Java消息生成器发送消息,Python代码将收到一条消息,但是消息的内容似乎是空的。因此,看起来在"Java土地“中发生的事情和在"Python陆地”中发生的事情之间有一些奇怪的不匹配。

对是什么原因有什么想法吗?

编辑3:

下面我尝试根据贾斯汀的答案切换到readNullableSimpleString(),所以现在我的代码如下所示:

代码语言:javascript
复制
System.out.println("message = " + msgReceived.getBodyBuffer().readNullableSimpleString() );

现在我明白了:

代码语言:javascript
复制
java.lang.IndexOutOfBoundsException: Error reading in simpleString, length=1701604463 is greater than readableBytes=13
    at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:183)
    at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:171)
    at org.apache.activemq.artemis.api.core.SimpleString.readNullableSimpleString(SimpleString.java:158)
    at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readNullableSimpleString(ChannelBufferWrapper.java:69)
    at org.fogbeam.experimental.bosworth.activemq.ActiveMQConsumerMain.main(ActiveMQConsumerMain.java:64)

看起来,对readNullableSimpleString的调用最终仍会触发对readSimpleString()的调用,该调用将出错。

编辑4:

我仍然不明白“为什么”到底发生了什么,但我确实找到了在消费者中成功获取我的信息的方法。这一机制起着以下作用:

代码语言:javascript
复制
                int bodySize = msgReceived.getBodySize();
                byte[] bytes = new byte[bodySize];
                msgReceived.getBodyBuffer().readBytes( bytes );
                System.out.println("message = " + new String( bytes ) );

看起来,STOMP消息正在被转换成BytesMessage而不是引擎盖下的TextMessage?或者接近那个的东西。而我发现的一些较早的讨论表明,这反过来又与内容长度标题的存在或不存在有关。真奇怪。但现在我很高兴我至少有一条前进的道路,即使这不是一条完美的前进之路。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2022-09-18 01:21:38

当代理接收到没有content-length头的STOMP消息时,它使用writeNullableSimpleString方法将主体编码为可为空的org.apache.activemq.artemis.api.core.ActiveMQBuffer。因此,当您将该消息作为核心消息接收时,需要使用SimpleString上的readNullableSimpleString方法将该数据作为可空的org.apache.activemq.artemis.api.core.ActiveMQBuffer读取。这在文献资料中得到了概述。

这与您的核心生产者和核心消费者一起工作的原因是他们分别使用writeStringreadString

同样,它不适用于核心生产者和Python使用者,因为核心生产者正在使用writeString,并且消息正在被转换为使用readNullableSimpleString的STOMP MESSAGE框架,当然,这是不起作用的。

需要注意的是,如何从ActiveMQBuffer中获取ClientMessage很重要,因为我们在缓冲区内部跟踪索引的方式很重要。例如,JavaDoc on ClientMessage#getBodyBuffer声明:

写主体的缓冲器。 警告:如果您只想读取消息的内容,请使用getDataBuffer()或getReadOnlyBuffer();

因此,当您使用readNullableSimpleString时,您将希望通过使用msgReceived.getDataBuffer().readNullableSimpleString()msgReceived.getReadOnlyBuffer().readNullableSimpleString()来实现

最后,请记住,消息的主体最终只是一个字节数组。代理无法知道数组中的数据类型。它可以是人类可读的文本,也可以是二进制数据,即使是文本,也可以通过多种方式进行编码。为了使客户端能够相互交换消息,它们必须使用一种通用格式。在这种情况下,使用核心API的Java客户端必须使用可空的SimpleString

票数 1
EN

Stack Overflow用户

发布于 2022-09-18 03:32:17

好吧,我想我终于明白这是怎么回事了。有几个不同的问题在起作用,它们之间的相互作用在早期就令人困惑。

我目前的理解是:

  1. stomp.py有一个设置"auto_content_length“,该设置控制库是否自动向消息中添加内容长度标头。然后,ActiveMQ代理从STOMP消息中创建不同的消息类型,具体取决于该标头是否存在。如果头部存在,您将得到一个BytesMessage。否则你会得到一个TextMessage。默认情况下,auto_content_length是真的,所以当这一切开始时,当我天真地期望文本(嘿,我发送一个字符串,对吗?)时,我得到了BytesMessage。
  2. 现在,在Java方面,您必须根据您获得的消息类型而不同地使用API。如果得到一个BytesMessage,则调用getBodyBuffer(),然后使用readBytes()读取字节。我最初获得BytesMessage并调用getBodyBuffer(),但错误地尝试使用各种readString()方法,这是错误的。
  3. 同样在Java方面,如果您得到了一个TextMessage (如果在Python/STOMP端关闭内容长度标头的话),那么您必须做一些不同的事情。在消息对象上调用getDataBuffer(),然后使用readNullableSimpleString()读取底层文本。

因此,当没有内容长度的标题时,这将成功地读取消息内容。

代码语言:javascript
复制
     ActiveMQBuffer tempBuff = msgReceived.getDataBuffer();
     System.out.println( "tempBuff: " + tempBuff.readNullableSimpleString() );

最后,我的问题是:

  1. 不知道auto_content_length设置
  2. 最初不了解ActiveMQ如何使用内容长度标头更改掩码下的消息类型。
  3. 没有意识到您必须调用getBodyBuffer() / readBytes()或getDataBuffer() / readNullableSimpleString(),这是基于您正在处理的消息类型。

有了这种理解,我现在就可以在Python端设置内容长度标头的情况下读取Java端的消息。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73759294

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档