我在kombu中有两个队列;一个用于提交请求(做某事),另一个用于通过pub/sub显示所述请求的增量状态。因此,在我的流程中,它将发布到请求队列,并在响应队列上使用。由于该任务可能需要一些时间,因此我希望为用户提供关于后端发生的情况的反馈;所有这些都在命令行上工作,因为我的kombu consume回调允许我添加一条logging.info()语句,将信息传回给我的用户:
def callback( msg, env ):
logging.info( str(msg) )
consumer.register_callback( callback )
consumer.consume()
while continue_consuming:
connection.drain_events()但是,我现在希望能够在django中提供相同的功能。我知道我可以创建一个generator函数作为HttpResponse对象的输入:
def view( reqeust ):
HttpResponse( gen() )
def gen():
yield 'streaming... '但我无法概念化如何将kombu队列的消息回调实现为生成器来提供此功能……有什么想法吗?
如果可能的话,我希望避免使用数据库层来存储进度/结果。
发布于 2011-06-14 02:01:38
最后,我决定稍微重新组织一下代码;因为我在kombu队列周围有一个包装器,以使接口更像multiprocess.Queue,所以我为我的get()方法创建了一个生成器。
def get( self, until=None ):
if until == None:
until = self.end_marker
for c in count():
m = self.consumer.queues[0].get( True )
if not m == None:
if m.payload == until:
raise StopIteration
yield m.payload这看起来工作得很好--但并不是那么干净,因为我需要知道self.end_marker或util是什么,也可能需要遍历所有的消费者队列(但我的类是每个对象的队列,所以这并不是太糟糕)。
那么在我看来,我所做的就是:
def view( response ):
q = Queue()
return HttpResponse( q.get() )有许多关于各种中间件的帖子;我只是懒得使用它们,而且它似乎工作得很好。
https://stackoverflow.com/questions/6297842
复制相似问题