首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么Python代码在stomp库缺少的消息参数中失败?

为什么Python代码在stomp库缺少的消息参数中失败?
EN

Stack Overflow用户
提问于 2021-04-28 13:12:41
回答 1查看 491关注 0票数 2

我工作的公司有一个Python测试脚本,它使用Python3.x向AMQ (input)主题发布消息。测试脚本被配置为使用来自输出主题的结果消息。我编写了一个Python脚本,它接收主题上的AMQ消息,处理它并将另一条消息发布到另一个(输出)主题上。

我的问题是,在Ubuntu v20中运行测试脚本时,我会收到以下错误消息:

代码语言:javascript
复制
Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/home/robot/.local/lib/python3.6/site-packages/stomp/transport.py", line 348, in __receiver_loop
    self.process_frame(f, frame)
  File "/home/robot/.local/lib/python3.6/site-packages/stomp/transport.py", line 193, in process_frame
    self.notify(frame_type, f)
  File "/home/robot/.local/lib/python3.6/site-packages/stomp/transport.py", line 248, in notify
    notify_func(frame)
TypeError: on_message() missing 1 required positional argument: 'message'

下面是在输入主题上接收消息、处理消息并将消息放置在输出主题上的代码:

代码语言:javascript
复制
#!/usr/bin/env python3

import time
import os
import threading
import logging
import signal
import stomp
import config_with_yaml as config
import sys
from src import factory

console = logging.StreamHandler()
formatter = logging.Formatter('[%(asctime)s] %(name)-12s %(levelname)-8s %(message)s')
console.setFormatter(formatter)
logging.getLogger().addHandler(console)
logging.getLogger().setLevel(logging.INFO)
LOGGER = logging.getLogger('my-client')


class MyConfig:
    def __init__(self):

    # boiler plate config
 



config = MyConfig()


def connect_and_subscribe(conn):
    conn.connect(config.amq_username, config.amq_password, wait=True, headers={'client-id': config.client_id})
    conn.subscribe(destination=config.in_topic, id=config.subscription_id, ack='auto',
                   headers={'activemq.subscriptionName': config.subscription_name, 'activemq.prefetchSize': 1})


class MyListener(stomp.ConnectionListener):
    def __init__(self, conn):
        self.conn = conn

    def on_error(self, headers, message):
        logging.error('received an error "%s"' % message)

    def on_message(self, headers, message):
        # MESSAGE PROCESSING START
        logging.debug('message recieved')
        logging.debug('headers: "%s"' % str(headers))
        logging.debug('message: "%s"' % message)
        logging.debug('processing "%s"  message' % config.name)

        df = factory.dataflow_factory(config.location, config.name.lower(), message)
        body = df.process()
        logging.debug('message processed: "%s"' % str(body))
        # # MESSAGE PROCESSING END

        # # Send response message
        self.conn.send(body=body, destination=config.out_topic)
        logging.debug('response sent')

    def on_disconnected(self):
        logging.debug('disconnected, reconnecting...')
        with self.conn.need_reconnect:
            self.conn.need_reconnect.notify()


class ConnectThread(threading.Thread):
    def __init__(self, conn):
        threading.Thread.__init__(self)
        self.daemon = True
        self.conn = conn

    def run(self):
        while True:
            try:
                if not self.conn.is_connected():
                    logging.info("client ready to connect to AMQ")
                    connect_and_subscribe(self.conn)
                    with self.conn.need_reconnect:
                        self.conn.need_reconnect.wait()
            except Exception as ex:
                template = "An exception of type {0} occurred. Arguments:\n{1!r}"
                message = template.format(type(ex).__name__, ex.args)
                logging.warning(message)
                time.sleep(5)


class ActiveDaemon(object):
    def __init__(self):
        self.stop = False
        signal.signal(signal.SIGINT, self.go_down)

        self.conn = stomp.Connection(host_and_ports=config.hosts, keepalive=True, reconnect_attempts_max=1)
        self.conn.set_ssl(for_hosts=config.hosts, key_file=config.amq_key_file_location,
                          cert_file=config.amq_cert_file_location)
        self.conn.need_reconnect = threading.Condition()
        self.conn.set_listener('', MyListener(self.conn))

    def run(self):
        connection_thread = ConnectThread(self.conn)
        connection_thread.start()

        logging.info("client daemon running")

        while not self.stop:
            time.sleep(2)

        self.conn.disconnect()

    def go_down(self, signum, frame):
        logging.info("Exit command received")
        self.stop = True


if __name__ == "__main__":
    logging.info("client starting...")
    daemon = ActiveDaemon()
    logging.info("client daemon created")
    daemon.run()

下面是脚本中的使用者代码。

代码语言:javascript
复制
class Consumer(ActiveMQConnection):
    def __init__(self, host: str, port: str, cert_key_location: str, cert_file_location: str, username: str,
                 password: str, subscription_topic: str):
        self.conn = None
        self.subscription_topic = subscription_topic
        self.class_name = self.MessageListener
        self.username = username
        self.password = password
        ActiveMQConnection.__init__(self, host=host, port=port, cert_key_location=cert_key_location,
                                    cert_file_location=cert_file_location, is_consumer=True)

    def connect_and_subscribe(self, expected_msg_count: int, time_limit_seconds: int) -> int:
        global EXPECTED_MSG_COUNT
        EXPECTED_MSG_COUNT = expected_msg_count
        while EXPECTED_MSG_COUNT != ACTUAL_MSG_COUNT:
            self.conn = self.get_connection()

        return ACTUAL_MSG_COUNT

    class MessageListener:
        def __init__(self, conn: object):
            self.conn = conn

        def on_error(self, headers, message):
            logging.error("Received an error with message: " + message)

        def on_message(self, headers, message):
            global ACTUAL_MSG_COUNT
            logging.debug("Message Received: " + message)
            ACTUAL_MSG_COUNT += 1

现在,奇怪的是。同事可以在Ubuntu (不确定版本)、相同的Python版本3.6 (也发生在3.8版)和相同的AMQ实例(文档化)和处理消息的相同代码(也是被篡改的)中运行这段代码。

我尝试将输出添加到stomp库中,但没有结果。

不过,我确实注意到,在侦听器循环中,我从stomp的transport.py脚本中观察到两条独立的日志消息:

代码语言:javascript
复制
INFO:stomp.py:Received frame: 'CONNECTED', len(body)=0
INFO:stomp.py:frame type = connected
INFO:stomp.py:frame type = connected
INFO:stomp.py:Sending frame: 'SUBSCRIBE'
INFO:stomp.py:notify_func = <bound method HeartbeatListener.on_connected of <stomp.connect.StompConnection11 object at 0x7fb7033b2160>>
INFO:stomp.py:frame type = before_message
INFO:stomp.py:frame type = before_message
INFO:stomp.py:notify_func = <bound method ConnectionListener.on_before_message of <stomp.connect.StompConnection11 object at 0x7fb7033b2160>>
INFO:stomp.py:Received frame: 'MESSAGE', len(body)=21
INFO:stomp.py:frame type = message
INFO:stomp.py:notify_func = <bound method Consumer.MessageListener.on_message of <__main__.Consumer.MessageListener object at 0x7fb702c9bf60>>
INFO:stomp.py:Receiver loop ended
INFO:stomp.py:frame type = receiver_loop_completed
INFO:stomp.py:frame type = receiver_loop_completed
INFO:stomp.py:notify_func = <bound method ConnectionListener.on_receiver_loop_completed of <stomp.connect.StompConnection11 object at 0x7fb7033b2160>>
INFO:stomp.py:frame type = disconnected
INFO:stomp.py:frame type = disconnected
Exception in thread Thread-2:

我不能确定这是否应该发生。

EN

回答 1

Stack Overflow用户

发布于 2022-04-02 02:05:24

您需要根据stomp.py版本修改侦听器类。很可能是不同的OSes附带了不同的stomp.py版本,参见python -c "import stomp; print(stomp.__version__)"。检查stomp模块的版本,并相应地调整您的侦听器/消费者。

在主要的v6+之后,大多数侦听器类函数参数都发生了变化。例如

代码语言:javascript
复制
on_error(self, headers, message) # before 4.1.21

代码语言:javascript
复制
on_error(self, frame) # after 6.1.0

请参阅主文档http://jasonrbriggs.github.io/stomp.py/stomp.html?highlight=listener#module-stomp.listener中的所有更改

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67300935

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档