首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >queue.Queue上的多路复用?

queue.Queue上的多路复用?
EN

Stack Overflow用户
提问于 2011-12-10 12:31:57
回答 4查看 4.4K关注 0票数 12

如何同时在多个queue.Queue上进行“选择”?

Golang的期望特征有它的频道:

代码语言:javascript
复制
select {
case i1 = <-c1:
    print("received ", i1, " from c1\n")
case c2 <- i2:
    print("sent ", i2, " to c2\n")
case i3, ok := (<-c3):  // same as: i3, ok := <-c3
    if ok {
        print("received ", i3, " from c3\n")
    } else {
        print("c3 is closed\n")
    }
default:
    print("no communication\n")
}

其中,去块的第一通道执行对应的块。我如何在Python中实现这一点?

Update0

按照在链接中提供的每个晚礼服21b的回答,所需的队列类型具有以下属性:

  • 多生产者/多消费者队列(MPMC)
  • 提供每个生产者FIFO/LIFO
  • 当队列为空/满时,消费者/生产者被阻塞

此外,通道可能阻塞,生产者将阻塞,直到消费者检索该项目。我不确定Python的队列是否能做到这一点。

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2011-12-16 20:46:14

生产者-消费者队列有许多不同的实现,比如queue.Queue可用。它们通常在许多属性上有所不同,比如德米特里·维尤科夫( Dmitry )在这个优秀文章上列出的属性。正如你所看到的,有超过10k不同的组合可能。这些队列所使用的算法也因需求的不同而有很大差异。不可能仅仅扩展现有的队列算法来保证额外的属性,因为这通常需要不同的内部数据结构和不同的算法。

Go的通道提供了相对较多的有保证的属性,因此这些通道可能适合于许多程序。最困难的要求之一是支持一次在多个通道上读取/阻塞(select语句),并在select语句中能够进行多个分支时公平地选择一个通道,这样就不会留下任何消息。Python的queue.Queue不提供这些特性,因此不可能用它来存档相同的行为。

因此,如果您想继续使用queue.Queue,您需要找到解决该问题的方法。然而,这些解决方案也有它们自己的缺点,而且很难维护。寻找另一个生产者-消费者队列,提供您需要的功能可能是一个更好的想法!总之,这里有两个可能的解决办法:

轮询

代码语言:javascript
复制
while True:
  try:
    i1 = c1.get_nowait()
    print "received %s from c1" % i1
  except queue.Empty:
    pass
  try:
    i2 = c2.get_nowait()
    print "received %s from c2" % i2
  except queue.Empty:
    pass
  time.sleep(0.1)

在轮询通道时,这可能会占用大量的CPU周期,当有大量消息时,可能会比较慢。使用带指数后退时间的time.sleep() (而不是这里显示的常数0.1秒)可能会大大改进这个版本。

单通知队列

代码语言:javascript
复制
queue_id = notify.get()
if queue_id == 1:
  i1 = c1.get()
  print "received %s from c1" % i1
elif queue_id == 2:
  i2 = c2.get()
  print "received %s from c2" % i2

使用此设置,您必须在发送到c1或c2后向通知队列发送一些内容。这可能适用于您,只要只有一个这样的通知队列对您来说就足够了(也就是说,您没有多个“选择”,每个阻塞在您的通道的不同子集上)。

或者,您也可以考虑使用Go。无论如何,Go的goroutines和并发支持比Python有限的线程功能强大得多。

票数 2
EN

Stack Overflow用户

发布于 2011-12-10 13:10:20

如果使用queue.PriorityQueue,则可以使用通道对象作为优先级来获得类似的行为:

代码语言:javascript
复制
import threading, logging
import random, string, time
from queue import PriorityQueue, Empty
from contextlib import contextmanager

logging.basicConfig(level=logging.NOTSET,
                    format="%(threadName)s - %(message)s")

class ChannelManager(object):
    next_priority = 0

    def __init__(self):
        self.queue = PriorityQueue()
        self.channels = []

    def put(self, channel, item, *args, **kwargs):
        self.queue.put((channel, item), *args, **kwargs)

    def get(self, *args, **kwargs):
        return self.queue.get(*args, **kwargs)

    @contextmanager
    def select(self, ordering=None, default=False):
        if default:
            try:
                channel, item = self.get(block=False)
            except Empty:
                channel = 'default'
                item = None
        else:
            channel, item = self.get()
        yield channel, item


    def new_channel(self, name):
        channel = Channel(name, self.next_priority, self)
        self.channels.append(channel)
        self.next_priority += 1
        return channel


class Channel(object):
    def __init__(self, name, priority, manager):
        self.name = name
        self.priority = priority
        self.manager = manager

    def __str__(self):
        return self.name

    def __lt__(self, other):
        return self.priority < other.priority

    def put(self, item):
        self.manager.put(self, item)


if __name__ == '__main__':
    num_channels = 3
    num_producers = 4
    num_items_per_producer = 2
    num_consumers = 3
    num_items_per_consumer = 3

    manager = ChannelManager()
    channels = [manager.new_channel('Channel#{0}'.format(i))
                for i in range(num_channels)]

    def producer_target():
        for i in range(num_items_per_producer):
            time.sleep(random.random())
            channel = random.choice(channels)
            message = random.choice(string.ascii_letters)
            logging.info('Putting {0} in {1}'.format(message, channel))
            channel.put(message)

    producers = [threading.Thread(target=producer_target,
                                  name='Producer#{0}'.format(i))
                 for i in range(num_producers)]
    for producer in producers:
        producer.start()
    for producer in producers:
        producer.join()
    logging.info('Producers finished')

    def consumer_target():
        for i in range(num_items_per_consumer):
            time.sleep(random.random())
            with manager.select(default=True) as (channel, item):
                if channel:
                    logging.info('Received {0} from {1}'.format(item, channel))
                else:
                    logging.info('No data received')

    consumers = [threading.Thread(target=consumer_target,
                                  name='Consumer#{0}'.format(i))
                 for i in range(num_consumers)]
    for consumer in consumers:
        consumer.start()
    for consumer in consumers:
        consumer.join()
    logging.info('Consumers finished')

示例输出:

代码语言:javascript
复制
Producer#0 - Putting x in Channel#2
Producer#2 - Putting l in Channel#0
Producer#2 - Putting A in Channel#2
Producer#3 - Putting c in Channel#0
Producer#3 - Putting z in Channel#1
Producer#1 - Putting I in Channel#1
Producer#1 - Putting L in Channel#1
Producer#0 - Putting g in Channel#1
MainThread - Producers finished
Consumer#1 - Received c from Channel#0
Consumer#2 - Received l from Channel#0
Consumer#0 - Received I from Channel#1
Consumer#0 - Received L from Channel#1
Consumer#2 - Received g from Channel#1
Consumer#1 - Received z from Channel#1
Consumer#0 - Received A from Channel#2
Consumer#1 - Received x from Channel#2
Consumer#2 - Received None from default
MainThread - Consumers finished

在本例中,ChannelManager只是queue.PriorityQueue的包装器,它将select方法实现为contextmanager,使其看起来类似于Go中的select语句。

有几件事要注意:

  • 排序
代码语言:javascript
复制
- In the Go example, the order in which the channels are written inside the `select` statement determines which channel's code will be executed if there's data available for more than one channel.
- In the python example the order is determined by the priority assigned to each channel. However, the priority can be dinamically assigned to each channel (as seen in the example), so changing the ordering would be possible with a more complex `select` method that takes care of assigning new priorities based on an argument to the method. Also, the old ordering could be reestablished once the context manager is finished.

  • 阻塞
代码语言:javascript
复制
- In the Go example, the `select` statement is blocking if a `default` case exists.
- In the python example, a boolean argument has to be passed to the `select` method to make it clear when blocking/non-blocking is desired. In the non-blocking case, the channel returned by the context mananager is just the string `'default'` so it's easy in the code inside to detect this in the code inside the `with` statement.

  • 线程:如本例所示,queue模块中的对象已经为多生产者、多消费者场景做好了准备。
票数 3
EN

Stack Overflow用户

发布于 2013-06-08 20:50:49

比钱斯项目复制Python中的Go通道,包括多路复用。它实现了与Go相同的算法,因此满足了所有您想要的属性:

  • 多个生产者和消费者可以通过成龙进行沟通。当生产者和消费者都准备好时,这对就会阻塞。
  • 生产者和消费者按他们到达的顺序得到服务(FIFO)
  • 空(满)队列将阻塞使用者(生产者)。

下面是您的示例:

代码语言:javascript
复制
c1 = Chan(); c2 = Chan(); c3 = Chan()

try:
    chan, value = chanselect([c1, c3], [(c2, i2)])
    if chan == c1:
        print("Received %r from c1" % value)
    elif chan == c2:
        print("Sent %r to c2" % i2)
    else:  # c3
        print("Received %r from c3" % value)
except ChanClosed as ex:
    if ex.which == c3:
        print("c3 is closed")
    else:
        raise

(完全披露:我写了这个图书馆)

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/8456516

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档