首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么我要复制50%的GCP Pub/Sub消息?

为什么我要复制50%的GCP Pub/Sub消息?
EN

Stack Overflow用户
提问于 2021-10-03 03:53:44
回答 1查看 1.1K关注 0票数 3

我在运行一条分析管道。

  • 吞吐量为每秒11条消息。
  • 我的Pub/Sub主题包含约200万条预定消息。
  • 80个GCE实例正在并行地提取消息。

这是我的主题和订阅:

代码语言:javascript
复制
gcloud pubsub topics create pipeline-input

gcloud beta pubsub subscriptions create pipeline-input-sub \
    --topic pipeline-input \
    --ack-deadline 600 \
    --expiration-period never \
    --dead-letter-topic dead-letter

以下是我如何提取信息:

代码语言:javascript
复制
import { PubSub, Message } from '@google-cloud/pubsub'

const pubSubClient = new PubSub()

const queue: Message[] = []

const populateQueue = async () => {
  const subscription = pubSubClient.subscription('pipeline-input-sub', {
    flowControl: {
      maxMessages: 5
    }
  })
  const messageHandler = async (message: Message) => {
    queue.push(message)
  }
  subscription.on('message', messageHandler)
}

const processQueueMessage = () => {
  const message = queue.shift()
  try {
    ...
    message.ack()
  } catch {
    ...
    message.nack()
  }
  processQueueMessage()
}

processQueueMessage()

处理时间为~7秒。

以下是许多类似的dup案例之一。同样的信息被传递给5(!!)不同GCE实例的时间:

  • 03:37:42.377
  • 03:45:20.883
  • 03:48:14.262
  • 04:01:33.848
  • 05:57:45.141

所有5次消息都已成功处理并进行了.ack()编辑。输出包含比输入多50%的消息!我非常了解“至少一次”行为,但我认为它可能复制0.01%的消息,而不是50%的消息。

主题输入100%不存在重复项。我通过Cloud验证了主题输入方法和未加标记消息的数量。数字匹配:在pub/子主题中没有重复项。

更新:

  1. 它看起来像所有那些复制创建的,因为它的最后期限到期。我百分之百肯定,在600秒的截止日期之前,我已经确认了99.9%的信息。
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-10-05 11:46:50

一些重复是预期的,虽然50%的重复率肯定很高。第一个问题是,这些是发布端的副本还是订阅方的副本?前者是在重新尝试同一消息的发布时创建的,从而导致同一消息的多个发布。这些消息将具有不同的消息ID。后者是由将相同的消息重传给订阅者造成的。这些消息具有相同的消息ID (尽管有不同的ack ID)。

听起来你已经证实了这些都是订阅端的副本。因此,正如您所提到的,可能的原因是过期的最后期限。问题是,为什么这些信息超过了截止日期?需要注意的一点是,在使用客户端库时,订阅中设置的ack截止日期不是所使用的。相反,客户端库试图根据客户端库设置和第99百分位数的ack延迟来优化ack截止日期。然后,它会更新消息的租约,直到对象传递到subscribe方法。这默认为一个小时。

因此,为了使消息保持租用状态,客户端库必须能够向服务器发送modifyAckDeadline请求。造成重复的一个可能原因是客户端无法发送这些请求,这可能是由于机器上的过载。运行这条管道的机器还有其他工作吗?如果是这样的话,它们就有可能在CPU、内存或网络方面超载,无法发送modifyAckDeadline请求,也无法及时处理消息。

消息批处理也可能会影响您对消息进行攻击的能力。作为优化,Pub/Sub系统存储批消息的确认,而不是单个消息。因此,必须确认批处理中的所有消息,才能确认所有消息。因此,如果您在一个批处理中有五个消息,并且确认其中四个消息,但是不要破坏最后一个消息,那么所有五个消息都将被重新传递。有一些缓存试图最小化这一点,但这仍然是一种可能性。有一个中型员额会更详细地讨论这个问题(请参阅“消息重发和复制率”一节)。可能值得检查的是,所有消息都在您的代码中被添加,而不是在您的代码中被插入,方法是在消息一收到并在调用acknack之前打印出消息ID。如果您的消息是批量发布的,那么单个nack可能会导致更多消息的重新传递。

批处理和复制之间的这种耦合是我们正在积极改进的东西。我希望这个问题会在某一时刻停止。同时,如果您控制了发布服务器,则可以将批处理设置中的max_messages属性设置为1,以防止消息的批处理。

如果所有这些都没有帮助,最好打开一个支持用例,并提供一些重复消息的项目名称、订阅名和消息ID。工程师可以更详细地研究为什么单个信息被重新传递。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69421959

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档