首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Stomp.py一次不处理一条消息

Stomp.py一次不处理一条消息
EN

Stack Overflow用户
提问于 2020-05-05 09:22:31
回答 1查看 1.3K关注 0票数 1

我正在使用Stomp.py连接到标准的ACtiveMQ服务器。我正在模拟接收器崩溃的情况,我希望能够重新启动它,并且它继续从导致它崩溃的消息中运行。

我创建了两个示例脚本:

  • putMessagesToQueue.py --这将将56条消息放入目标
  • readMessagesFromQueue.py --这将从目的地读取消息。如果它读取第6条消息,它将引发异常。每条消息需要1秒才能处理

运行测试所采取的步骤:

运行putMessagesToQueue.py

  • I

  • readMessagesFromQueue.py -它成功地处理了5条消息,并在消息6

  • I terminate readMessagesFromQueue.py (ctrl-c)

  • 中引发异常,再次运行readMessagesFromQueue.py

对于步骤4中我想要的行为,我希望它从消息7开始处理。

但是我没看到这个。如果接收者订阅了ack='auto‘,那么在步骤4中它不会处理任何消息--所有的消息都从队列中消失了,我丢失了50条消息!

如果我使用ack='client‘或ack=’client- beginning‘,那么在步骤4中,它从一开始就再次启动,然后在消息6上再次崩溃。

这似乎表明,接收者不是一次处理消息,而是同时接收每一条消息,并在每一条消息中运行。我不想这种行为,因为我想扩大到运行5个累犯,我希望负载分布。现在,第一个接收者,我开始接收所有的消息,并开始遍历它们,接收者2-4只需等待新的消息。我希望接收者一次只接收一条信息!

有谁能给我一些提示,说明我是如何实现这个错误的:

来源

putMessagesToQueue.py

代码语言:javascript
复制
import stomp

stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"

conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)

for x in range(0,5):
  conn.send(body="OK-BEFORE-CRASH", destination=destination)

conn.send(body="CRASH", destination=destination)

for x in range(0,50):
  conn.send(body="OK-AFTER-CRASH", destination=destination)

readMessagesFromQueue.py

代码语言:javascript
复制
import stomp
import time

stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"

conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)

class StompConnectionListenerClass(stomp.ConnectionListener):
  processMessage = None
  def __init__(self, processMessage):
    self.processMessage = processMessage
  def on_error(self, headers, message):
    print('XX received an error "%s"' % message)
  def on_message(self, headers, message):
    self.processMessage(headers, message)

def messageProcessingFunction(headers, message):
  print('Main recieved a message "%s"' % message)
  if (message=="CRASH"):
    print("Message told processor to crash")
    raise Exception("Reached message which crashes reciever")
  time.sleep(1) # simulate processing message taking time

stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction)
conn.set_listener('', stompConnectionListener)

print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='auto')
#conn.subscribe(destination=destination, id=1, ack='client')
#conn.subscribe(destination=destination, id=1, ack='client-individual')

print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
    while True:
        time.sleep(10)
except KeyboardInterrupt:
    print('interrupted - so exiting!')

conn.close()

print("Reciever terminated")

更新001

通过更改接收函数以使用ack =‘client- described’并手动发送ack消息,我设法获得了上述所需的行为。(见下文新版本)

但我仍然无法让接收者一次处理一条信息。这可以通过以下步骤加以说明:

运行putMessagesToQueue.py

  • I的
  1. 运行readMessagesFromQueue2.py -它将启动processing
  2. In一个新的终端运行readMessagesFromQueue2.py

首先,第二个readMessagesFromQueue2在第一个崩溃之前什么也不做,然后开始接收消息。我希望接收者的两个实例从一开始就读取消息。

readMessagesFromQueue2.py

代码语言:javascript
复制
import stomp
import time

stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"

conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)

class StompConnectionListenerClass(stomp.ConnectionListener):
  processMessage = None
  conn = None
  def __init__(self, processMessage, conn):
    self.processMessage = processMessage
    self.conn = conn
  def on_error(self, headers, message):
    print('XX received an error "%s"' % message)
  def on_message(self, headers, message):
    try:
      self.processMessage(headers, message)
    finally:
      self.conn.ack(id=headers["message-id"], subscription=headers["subscription"])

def messageProcessingFunction(headers, message):
  print('Main recieved a message "%s"' % message)
  if (message=="CRASH"):
    print("Message told processor to crash")
    raise Exception("Reached message which crashes reciever")
  time.sleep(1) # simulate processing message taking time

stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction, conn=conn)
conn.set_listener('', stompConnectionListener)

print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='client-individual')

print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
    while True:
        time.sleep(10)
except KeyboardInterrupt:
    print('interrupted - so exiting!')

conn.close()

print("Reciever terminated")
EN

回答 1

Stack Overflow用户

发布于 2020-05-05 11:10:35

大量阅读不同的文档,我发现了这个问题。

ActiveMQ有一个选项预取大小- https://svn.apache.org/repos/infra/websites/production/activemq/content/5.7.0/what-is-the-prefetch-limit-for.html

如果您有很少的消息需要很长时间来处理,您可以将其设置为1。在其他情况下,这不是一种丙酸盐。

我可以在stopm.py中使用以下行来完成这一任务: conn.subscribe(destination=destination,id=1,ack=‘client- following’,headers={‘activemq.pregetchSize’:1})

因此,使用手动或自动ack既不是这里也不是那里。键将预取限制为1。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61609815

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档