首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >google.cloud.pubsub -流拉抢占PubSub消息

google.cloud.pubsub -流拉抢占PubSub消息
EN

Stack Overflow用户
提问于 2018-07-02 11:26:44
回答 1查看 548关注 0票数 0

我目前正在对最新的google-cloud-pubsub==0.35.4公开发行版进行一些测试。我的意图是使用动态数量的订户客户端来处理一个永无止境的流(在负载中发生变化)。

但是,当我排队的时候..。600条消息和1个客户端运行,然后添加其他客户端:

  • 预期:所有剩余的消息都在所有客户端平均分配。
  • 观察到:只有新消息分布在客户端上,任何旧消息都会发送给预先存在的客户端。

下面是我为我的客户端使用的简化版本(作为参考,我们只运行低优先级主题)。我不包括出版商,因为它没有关系。

代码语言:javascript
复制
PRIORITY_HIGH = 1
PRIORITY_MEDIUM = 2
PRIORITY_LOW = 3

MESSAGE_LIMIT = 10
ACKS_PER_MIN = 100.00
ACKS_RATIO = {
    PRIORITY_LOW: 100,
}

PRIORITY_TOPICS = {
    PRIORITY_LOW: 'test_low',
}

PRIORITY_SEQUENCES = {
    PRIORITY_LOW: [PRIORITY_LOW, PRIORITY_MEDIUM, PRIORITY_HIGH],
}


class Subscriber:
    subscriber_client = None
    subscriptions = {}

    priority_queue = defaultdict(Queue.Queue)
    priorities = []

    def __init__(self):
        logging.basicConfig()
        self.subscriber_client = pubsub_v1.SubscriberClient()

        for option, percentage in ACKS_RATIO.iteritems():
            self.priorities += [option] * percentage

    def subscribe_to_topic(self, topic, max_messages=10):
        self.subscriptions[topic] = self.subscriber_client.subscribe(
            BASE_TOPIC_PATH.format(project=PROJECT, topic=topic,),
            self.process_message,
            flow_control=pubsub_v1.types.FlowControl(
                max_messages=max_messages,
            ),
        )

    def un_subscribe_from_topic(self, topic):
        subscription = self.subscriptions.get(topic)
        if subscription:
            subscription.cancel()
            del self.subscriptions[topic]

    def process_message(self, message):
        json_message = json.loads(message.data.decode('utf8'))
        self.priority_queue[json_message['priority']].put(message)

    def retrieve_message(self):
        message = None
        priority = random.choice(self.priorities)
        ack_priorities = PRIORITY_SEQUENCES[priority]

        for ack_priority in ack_priorities:
            try:
                message = self.priority_queue[ack_priority].get(block=False)
                break
            except Queue.Empty:
                pass

        return message


if __name__ == '__main__':
    messages_acked = 0

    pub_sub = Subscriber()
    pub_sub.subscribe_to_topic(PRIORITY_TOPICS[PRIORITY_LOW], MESSAGE_LIMIT * 3)

    while True:
        msg = pub_sub.retrieve_message()
        if msg:
            json_msg = json.loads(msg.data.decode('utf8'))

            msg.ack()
            print ("%s - Akked Priority %s , High %s, Medium %s, Low %s" % (
                datetime.datetime.now().strftime('%H:%M:%S'),
                json_msg['priority'],
                pub_sub.priority_queue[PRIORITY_HIGH].qsize(),
                pub_sub.priority_queue[PRIORITY_MEDIUM].qsize(),
                pub_sub.priority_queue[PRIORITY_LOW].qsize(),
            ))

        time.sleep(60.0 / ACKS_PER_MIN)

我想知道这种行为是否与流如何拉动功能一样固有,或者是否有一些配置可以改变这种行为。

干杯!

EN

回答 1

Stack Overflow用户

发布于 2018-10-18 23:22:41

考虑到云Pub/Sub文档,Cloud /sub至少为每个订阅提供一次发布的消息,但是这种行为有一些例外:

  • 删除不能在7天的最长保留时间内传递的邮件。
  • 在创建给定订阅之前发布的消息将不会传递。

换句话说,服务将把消息传递给消息发布之前创建的订阅,因此旧消息将无法用于新订阅。据我所知,Cloud /Sub没有提供一个功能来改变这种行为。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51135119

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档