我正在使用Stomp.py连接到标准的ACtiveMQ服务器。我正在模拟接收器崩溃的情况,我希望能够重新启动它,并且它继续从导致它崩溃的消息中运行。
我创建了两个示例脚本:
运行测试所采取的步骤:
运行putMessagesToQueue.py
对于步骤4中我想要的行为,我希望它从消息7开始处理。
但是我没看到这个。如果接收者订阅了ack='auto‘,那么在步骤4中它不会处理任何消息--所有的消息都从队列中消失了,我丢失了50条消息!
如果我使用ack='client‘或ack=’client- beginning‘,那么在步骤4中,它从一开始就再次启动,然后在消息6上再次崩溃。
这似乎表明,接收者不是一次处理消息,而是同时接收每一条消息,并在每一条消息中运行。我不想这种行为,因为我想扩大到运行5个累犯,我希望负载分布。现在,第一个接收者,我开始接收所有的消息,并开始遍历它们,接收者2-4只需等待新的消息。我希望接收者一次只接收一条信息!
有谁能给我一些提示,说明我是如何实现这个错误的:
来源
putMessagesToQueue.py
import stomp
stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"
conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)
for x in range(0,5):
conn.send(body="OK-BEFORE-CRASH", destination=destination)
conn.send(body="CRASH", destination=destination)
for x in range(0,50):
conn.send(body="OK-AFTER-CRASH", destination=destination)readMessagesFromQueue.py
import stomp
import time
stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"
conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)
class StompConnectionListenerClass(stomp.ConnectionListener):
processMessage = None
def __init__(self, processMessage):
self.processMessage = processMessage
def on_error(self, headers, message):
print('XX received an error "%s"' % message)
def on_message(self, headers, message):
self.processMessage(headers, message)
def messageProcessingFunction(headers, message):
print('Main recieved a message "%s"' % message)
if (message=="CRASH"):
print("Message told processor to crash")
raise Exception("Reached message which crashes reciever")
time.sleep(1) # simulate processing message taking time
stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction)
conn.set_listener('', stompConnectionListener)
print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='auto')
#conn.subscribe(destination=destination, id=1, ack='client')
#conn.subscribe(destination=destination, id=1, ack='client-individual')
print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
while True:
time.sleep(10)
except KeyboardInterrupt:
print('interrupted - so exiting!')
conn.close()
print("Reciever terminated")更新001
通过更改接收函数以使用ack =‘client- described’并手动发送ack消息,我设法获得了上述所需的行为。(见下文新版本)
但我仍然无法让接收者一次处理一条信息。这可以通过以下步骤加以说明:
运行putMessagesToQueue.py
首先,第二个readMessagesFromQueue2在第一个崩溃之前什么也不做,然后开始接收消息。我希望接收者的两个实例从一开始就读取消息。
readMessagesFromQueue2.py
import stomp
import time
stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"
conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)
class StompConnectionListenerClass(stomp.ConnectionListener):
processMessage = None
conn = None
def __init__(self, processMessage, conn):
self.processMessage = processMessage
self.conn = conn
def on_error(self, headers, message):
print('XX received an error "%s"' % message)
def on_message(self, headers, message):
try:
self.processMessage(headers, message)
finally:
self.conn.ack(id=headers["message-id"], subscription=headers["subscription"])
def messageProcessingFunction(headers, message):
print('Main recieved a message "%s"' % message)
if (message=="CRASH"):
print("Message told processor to crash")
raise Exception("Reached message which crashes reciever")
time.sleep(1) # simulate processing message taking time
stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction, conn=conn)
conn.set_listener('', stompConnectionListener)
print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='client-individual')
print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
while True:
time.sleep(10)
except KeyboardInterrupt:
print('interrupted - so exiting!')
conn.close()
print("Reciever terminated")发布于 2020-05-05 11:10:35
大量阅读不同的文档,我发现了这个问题。
ActiveMQ有一个选项预取大小- https://svn.apache.org/repos/infra/websites/production/activemq/content/5.7.0/what-is-the-prefetch-limit-for.html
如果您有很少的消息需要很长时间来处理,您可以将其设置为1。在其他情况下,这不是一种丙酸盐。
我可以在stopm.py中使用以下行来完成这一任务: conn.subscribe(destination=destination,id=1,ack=‘client- following’,headers={‘activemq.pregetchSize’:1})
因此,使用手动或自动ack既不是这里也不是那里。键将预取限制为1。
https://stackoverflow.com/questions/61609815
复制相似问题