抽象情境。我们有两个绵羊,我们可以在时间异步使用(信号量(2))和一个门,我们可以在时间上使用。我们想让羊通过门2次(每次我们需要1只羊和1只门,持续1秒)和喂养1次羊(需要1只羊和2秒)。下面是代码示例:
import asyncio
class Sheep:
_sem = asyncio.Semaphore(2) # we have 2 avaliable sheeps at time
def __init__(self, reason):
self._reason = reason
async def acquire(self):
await type(self)._sem.acquire()
print('acquire sheep ({})'.format(self._reason))
def release(self):
print('release sheep ({})'.format(self._reason))
type(self)._sem.release()
class Gate:
_sem = asyncio.Semaphore(1) # we have 1 avaliable gate at time
def __init__(self, reason):
self._reason = reason
async def acquire(self):
await type(self)._sem.acquire()
print('acquire gate ({})'.format(self._reason))
def release(self):
print('release gate ({})'.format(self._reason))
type(self)._sem.release()
async def spend(reason):
sheep = Sheep(reason)
gate = Gate(reason)
await asyncio.gather(
sheep.acquire(),
gate.acquire()
) # block 1 sheep, 1 gate
await asyncio.sleep(1) # 1 second
print('Spend sheep through a gate')
sheep.release()
gate.release()
async def feed(reason):
sheep = Sheep(reason)
await asyncio.gather(
sheep.acquire()
) # block 1 sheep
await asyncio.sleep(2) # 2 seconds
print('Feed sheep')
sheep.release()
async def main():
await asyncio.gather(
spend('spend 1'),
feed('feed 1'),
spend('spend 2')
) # spend 2 times, feed 1 time
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())输出:
acquire gate (spend 1)
acquire sheep (spend 1)
acquire sheep (spend 2) <-----
Spend sheep through a gate
release sheep (spend 1)
release gate (spend 1)
acquire sheep (feed 1)
acquire gate (spend 2)
Spend sheep through a gate
release sheep (spend 2)
release gate (spend 2)
Feed sheep
release sheep (feed 1)
[Finished in 3.2s]问题是程序不能以最优的方式工作,在输出的第3行中的原因是:spend 2阻塞绵羊,而不能立即使用它,它应该等待被spend 1阻塞的单门。第二,可以在spend 1浪费时间的情况下饲养的羊:

程序应该如何工作的最佳方式:spend 1阻塞1只羊和1只羊,spend 2看到那扇门被阻塞,没有理由立即阻止第二只羊。feed 1可以在spend 1运行时阻塞第二只羊并运行。在这种情况下,程序将在2秒内完成,而不是3秒:

很容易看出你是否改变了main的集合内的顺序。
资源不仅要并行不悖,而且要同步,只有羊和门是可用的,才能封锁羊和门。就像这样:
while sheep.locked() or gate.locked():
asyncio.sleep(0)
await asyncio.gather(
sheep.acquire(),
gate.acquire()
)但这看起来不像是一个通用的、好的解决方案。是否存在解决这一问题的任何模式或更好的方法?任何想法都欢迎。
发布于 2015-12-02 09:11:34
基于这个解决方案,我为这个示例创建了解决方案。我们需要两件事:
locked()函数添加到Sheep和Gate中,即检查现在是否可以获取对象MultiAcquire任务,该任务将只获取对象,如果所有对象都可以立即获得,则只获取(否则暂停发布事件)下面是最后的代码,请参阅MultiAcquire -它的主要内容是:
import asyncio
class Sheep:
_sem = asyncio.Semaphore(2) # we have 2 avaliable sheeps at time
def __init__(self, reason):
self._reason = reason
async def acquire(self):
await type(self)._sem.acquire()
print('acquire sheep ({})'.format(self._reason))
def release(self):
print('release sheep ({})'.format(self._reason))
type(self)._sem.release()
def locked(self):
return type(self)._sem.locked()
class Gate:
_sem = asyncio.Semaphore(1) # we have 1 avaliable gate at time
def __init__(self, reason):
self._reason = reason
async def acquire(self):
await type(self)._sem.acquire()
print('acquire gate ({})'.format(self._reason))
def release(self):
print('release gate ({})'.format(self._reason))
type(self)._sem.release()
def locked(self):
return type(self)._sem.locked()
class MultiAcquire(asyncio.Task):
_check_lock = asyncio.Lock() # to suspend for creating task that acquires objects
_release_event = asyncio.Event() # to suspend for any object was released
def __init__(self, locks):
super().__init__(self._task_coro())
self._locks = locks
# Here we use decorator to subscribe all release() calls,
# _release_event would be set in this case:
for l in self._locks:
l.release = self._notify(l.release)
async def _task_coro(self):
while True:
# Create task to acquire all locks and break on success:
async with type(self)._check_lock:
if not any(l.locked() for l in self._locks): # task would be created only if all objects can be acquired
task = asyncio.gather(*[l.acquire() for l in self._locks]) # create task to acquire all objects
await asyncio.sleep(0) # start task without waiting for it
break
# Wait for any release() to try again:
await type(self)._release_event.wait()
# Wait for task:
return await task
def _notify(self, func):
def wrapper(*args, **kwargs):
type(self)._release_event.set()
type(self)._release_event.clear()
return func(*args, **kwargs)
return wrapper
async def spend(reason):
sheep = Sheep(reason)
gate = Gate(reason)
await MultiAcquire([sheep, gate]) # block 1 sheep, 1 gate
await asyncio.sleep(1) # 1 second
print('Spend sheep through a gate')
sheep.release()
gate.release()
async def feed(reason):
sheep = Sheep(reason)
await MultiAcquire([sheep]) # block 1 sheep
await asyncio.sleep(2) # 2 seconds
print('Feed sheep')
sheep.release()
async def main():
await asyncio.gather(
spend('spend 1'),
feed('feed 1'),
spend('spend 2')
) # spend 2 times, feed 1 time
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())输出:
acquire gate (spend 2)
acquire sheep (spend 2)
acquire sheep (feed 1)
Spend sheep through a gate
release sheep (spend 2)
release gate (spend 2)
acquire sheep (spend 1)
acquire gate (spend 1)
Feed sheep
release sheep (feed 1)
Spend sheep through a gate
release sheep (spend 1)
release gate (spend 1)
[Finished in 2.2s]发布于 2015-11-30 12:58:48
您可以实现一个处理多个锁的异步上下文管理器。此对象应确保在等待另一个不可用的锁时不会持有任何锁:
class multilock(asyncio.locks._ContextManagerMixin):
def __init__(self, *locks):
self.released = list(locks)
self.acquired = []
async def acquire(self):
while self.released:
lock = self.released.pop()
if lock.locked():
self.release()
await lock.acquire()
self.acquired.append(lock)
def release(self):
while self.acquired:
lock = self.acquired.pop()
lock.release()
self.released.append(lock)示例:
async def test(lock1, lock2):
async with multilock(lock1, lock2):
print('Do something')https://stackoverflow.com/questions/33969075
复制相似问题