我工作的公司有一个Python测试脚本,它使用Python3.x向AMQ (input)主题发布消息。测试脚本被配置为使用来自输出主题的结果消息。我编写了一个Python脚本,它接收主题上的AMQ消息,处理它并将另一条消息发布到另一个(输出)主题上。
我的问题是,在Ubuntu v20中运行测试脚本时,我会收到以下错误消息:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/home/robot/.local/lib/python3.6/site-packages/stomp/transport.py", line 348, in __receiver_loop
self.process_frame(f, frame)
File "/home/robot/.local/lib/python3.6/site-packages/stomp/transport.py", line 193, in process_frame
self.notify(frame_type, f)
File "/home/robot/.local/lib/python3.6/site-packages/stomp/transport.py", line 248, in notify
notify_func(frame)
TypeError: on_message() missing 1 required positional argument: 'message'下面是在输入主题上接收消息、处理消息并将消息放置在输出主题上的代码:
#!/usr/bin/env python3
import time
import os
import threading
import logging
import signal
import stomp
import config_with_yaml as config
import sys
from src import factory
console = logging.StreamHandler()
formatter = logging.Formatter('[%(asctime)s] %(name)-12s %(levelname)-8s %(message)s')
console.setFormatter(formatter)
logging.getLogger().addHandler(console)
logging.getLogger().setLevel(logging.INFO)
LOGGER = logging.getLogger('my-client')
class MyConfig:
def __init__(self):
# boiler plate config
config = MyConfig()
def connect_and_subscribe(conn):
conn.connect(config.amq_username, config.amq_password, wait=True, headers={'client-id': config.client_id})
conn.subscribe(destination=config.in_topic, id=config.subscription_id, ack='auto',
headers={'activemq.subscriptionName': config.subscription_name, 'activemq.prefetchSize': 1})
class MyListener(stomp.ConnectionListener):
def __init__(self, conn):
self.conn = conn
def on_error(self, headers, message):
logging.error('received an error "%s"' % message)
def on_message(self, headers, message):
# MESSAGE PROCESSING START
logging.debug('message recieved')
logging.debug('headers: "%s"' % str(headers))
logging.debug('message: "%s"' % message)
logging.debug('processing "%s" message' % config.name)
df = factory.dataflow_factory(config.location, config.name.lower(), message)
body = df.process()
logging.debug('message processed: "%s"' % str(body))
# # MESSAGE PROCESSING END
# # Send response message
self.conn.send(body=body, destination=config.out_topic)
logging.debug('response sent')
def on_disconnected(self):
logging.debug('disconnected, reconnecting...')
with self.conn.need_reconnect:
self.conn.need_reconnect.notify()
class ConnectThread(threading.Thread):
def __init__(self, conn):
threading.Thread.__init__(self)
self.daemon = True
self.conn = conn
def run(self):
while True:
try:
if not self.conn.is_connected():
logging.info("client ready to connect to AMQ")
connect_and_subscribe(self.conn)
with self.conn.need_reconnect:
self.conn.need_reconnect.wait()
except Exception as ex:
template = "An exception of type {0} occurred. Arguments:\n{1!r}"
message = template.format(type(ex).__name__, ex.args)
logging.warning(message)
time.sleep(5)
class ActiveDaemon(object):
def __init__(self):
self.stop = False
signal.signal(signal.SIGINT, self.go_down)
self.conn = stomp.Connection(host_and_ports=config.hosts, keepalive=True, reconnect_attempts_max=1)
self.conn.set_ssl(for_hosts=config.hosts, key_file=config.amq_key_file_location,
cert_file=config.amq_cert_file_location)
self.conn.need_reconnect = threading.Condition()
self.conn.set_listener('', MyListener(self.conn))
def run(self):
connection_thread = ConnectThread(self.conn)
connection_thread.start()
logging.info("client daemon running")
while not self.stop:
time.sleep(2)
self.conn.disconnect()
def go_down(self, signum, frame):
logging.info("Exit command received")
self.stop = True
if __name__ == "__main__":
logging.info("client starting...")
daemon = ActiveDaemon()
logging.info("client daemon created")
daemon.run()下面是脚本中的使用者代码。
class Consumer(ActiveMQConnection):
def __init__(self, host: str, port: str, cert_key_location: str, cert_file_location: str, username: str,
password: str, subscription_topic: str):
self.conn = None
self.subscription_topic = subscription_topic
self.class_name = self.MessageListener
self.username = username
self.password = password
ActiveMQConnection.__init__(self, host=host, port=port, cert_key_location=cert_key_location,
cert_file_location=cert_file_location, is_consumer=True)
def connect_and_subscribe(self, expected_msg_count: int, time_limit_seconds: int) -> int:
global EXPECTED_MSG_COUNT
EXPECTED_MSG_COUNT = expected_msg_count
while EXPECTED_MSG_COUNT != ACTUAL_MSG_COUNT:
self.conn = self.get_connection()
return ACTUAL_MSG_COUNT
class MessageListener:
def __init__(self, conn: object):
self.conn = conn
def on_error(self, headers, message):
logging.error("Received an error with message: " + message)
def on_message(self, headers, message):
global ACTUAL_MSG_COUNT
logging.debug("Message Received: " + message)
ACTUAL_MSG_COUNT += 1现在,奇怪的是。同事可以在Ubuntu (不确定版本)、相同的Python版本3.6 (也发生在3.8版)和相同的AMQ实例(文档化)和处理消息的相同代码(也是被篡改的)中运行这段代码。
我尝试将输出添加到stomp库中,但没有结果。
不过,我确实注意到,在侦听器循环中,我从stomp的transport.py脚本中观察到两条独立的日志消息:
INFO:stomp.py:Received frame: 'CONNECTED', len(body)=0
INFO:stomp.py:frame type = connected
INFO:stomp.py:frame type = connected
INFO:stomp.py:Sending frame: 'SUBSCRIBE'
INFO:stomp.py:notify_func = <bound method HeartbeatListener.on_connected of <stomp.connect.StompConnection11 object at 0x7fb7033b2160>>
INFO:stomp.py:frame type = before_message
INFO:stomp.py:frame type = before_message
INFO:stomp.py:notify_func = <bound method ConnectionListener.on_before_message of <stomp.connect.StompConnection11 object at 0x7fb7033b2160>>
INFO:stomp.py:Received frame: 'MESSAGE', len(body)=21
INFO:stomp.py:frame type = message
INFO:stomp.py:notify_func = <bound method Consumer.MessageListener.on_message of <__main__.Consumer.MessageListener object at 0x7fb702c9bf60>>
INFO:stomp.py:Receiver loop ended
INFO:stomp.py:frame type = receiver_loop_completed
INFO:stomp.py:frame type = receiver_loop_completed
INFO:stomp.py:notify_func = <bound method ConnectionListener.on_receiver_loop_completed of <stomp.connect.StompConnection11 object at 0x7fb7033b2160>>
INFO:stomp.py:frame type = disconnected
INFO:stomp.py:frame type = disconnected
Exception in thread Thread-2:我不能确定这是否应该发生。
发布于 2022-04-02 02:05:24
您需要根据stomp.py版本修改侦听器类。很可能是不同的OSes附带了不同的stomp.py版本,参见python -c "import stomp; print(stomp.__version__)"。检查stomp模块的版本,并相应地调整您的侦听器/消费者。
在主要的v6+之后,大多数侦听器类函数参数都发生了变化。例如
on_error(self, headers, message) # before 4.1.21至
on_error(self, frame) # after 6.1.0请参阅主文档http://jasonrbriggs.github.io/stomp.py/stomp.html?highlight=listener#module-stomp.listener中的所有更改
https://stackoverflow.com/questions/67300935
复制相似问题