我在运行一条分析管道。
这是我的主题和订阅:
gcloud pubsub topics create pipeline-input
gcloud beta pubsub subscriptions create pipeline-input-sub \
--topic pipeline-input \
--ack-deadline 600 \
--expiration-period never \
--dead-letter-topic dead-letter以下是我如何提取信息:
import { PubSub, Message } from '@google-cloud/pubsub'
const pubSubClient = new PubSub()
const queue: Message[] = []
const populateQueue = async () => {
const subscription = pubSubClient.subscription('pipeline-input-sub', {
flowControl: {
maxMessages: 5
}
})
const messageHandler = async (message: Message) => {
queue.push(message)
}
subscription.on('message', messageHandler)
}
const processQueueMessage = () => {
const message = queue.shift()
try {
...
message.ack()
} catch {
...
message.nack()
}
processQueueMessage()
}
processQueueMessage()处理时间为~7秒。
以下是许多类似的dup案例之一。同样的信息被传递给5(!!)不同GCE实例的时间:
所有5次消息都已成功处理并进行了.ack()编辑。输出包含比输入多50%的消息!我非常了解“至少一次”行为,但我认为它可能复制0.01%的消息,而不是50%的消息。
主题输入100%不存在重复项。我通过Cloud验证了主题输入方法和未加标记消息的数量。数字匹配:在pub/子主题中没有重复项。
更新:
发布于 2021-10-05 11:46:50
一些重复是预期的,虽然50%的重复率肯定很高。第一个问题是,这些是发布端的副本还是订阅方的副本?前者是在重新尝试同一消息的发布时创建的,从而导致同一消息的多个发布。这些消息将具有不同的消息ID。后者是由将相同的消息重传给订阅者造成的。这些消息具有相同的消息ID (尽管有不同的ack ID)。
听起来你已经证实了这些都是订阅端的副本。因此,正如您所提到的,可能的原因是过期的最后期限。问题是,为什么这些信息超过了截止日期?需要注意的一点是,在使用客户端库时,订阅中设置的ack截止日期不是所使用的。相反,客户端库试图根据客户端库设置和第99百分位数的ack延迟来优化ack截止日期。然后,它会更新消息的租约,直到对象传递到subscribe方法。这默认为一个小时。
因此,为了使消息保持租用状态,客户端库必须能够向服务器发送modifyAckDeadline请求。造成重复的一个可能原因是客户端无法发送这些请求,这可能是由于机器上的过载。运行这条管道的机器还有其他工作吗?如果是这样的话,它们就有可能在CPU、内存或网络方面超载,无法发送modifyAckDeadline请求,也无法及时处理消息。
消息批处理也可能会影响您对消息进行攻击的能力。作为优化,Pub/Sub系统存储批消息的确认,而不是单个消息。因此,必须确认批处理中的所有消息,才能确认所有消息。因此,如果您在一个批处理中有五个消息,并且确认其中四个消息,但是不要破坏最后一个消息,那么所有五个消息都将被重新传递。有一些缓存试图最小化这一点,但这仍然是一种可能性。有一个中型员额会更详细地讨论这个问题(请参阅“消息重发和复制率”一节)。可能值得检查的是,所有消息都在您的代码中被添加,而不是在您的代码中被插入,方法是在消息一收到并在调用ack和nack之前打印出消息ID。如果您的消息是批量发布的,那么单个nack可能会导致更多消息的重新传递。
批处理和复制之间的这种耦合是我们正在积极改进的东西。我希望这个问题会在某一时刻停止。同时,如果您控制了发布服务器,则可以将批处理设置中的max_messages属性设置为1,以防止消息的批处理。
如果所有这些都没有帮助,最好打开一个支持用例,并提供一些重复消息的项目名称、订阅名和消息ID。工程师可以更详细地研究为什么单个信息被重新传递。
https://stackoverflow.com/questions/69421959
复制相似问题