首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >多资源同步阻塞

多资源同步阻塞
EN

Stack Overflow用户
提问于 2015-11-28 07:59:01
回答 2查看 460关注 0票数 1

抽象情境。我们有两个绵羊,我们可以在时间异步使用(信号量(2))和一个门,我们可以在时间上使用。我们想让羊通过门2次(每次我们需要1只羊和1只门,持续1秒)和喂养1次羊(需要1只羊和2秒)。下面是代码示例:

代码语言:javascript
复制
import asyncio


class Sheep:
    _sem = asyncio.Semaphore(2)  # we have 2 avaliable sheeps at time

    def __init__(self, reason):
        self._reason = reason

    async def acquire(self):
        await type(self)._sem.acquire()
        print('acquire sheep ({})'.format(self._reason))

    def release(self):
        print('release sheep ({})'.format(self._reason))
        type(self)._sem.release()


class Gate:
    _sem = asyncio.Semaphore(1)  # we have 1 avaliable gate at time

    def __init__(self, reason):
        self._reason = reason

    async def acquire(self):
        await type(self)._sem.acquire()
        print('acquire gate ({})'.format(self._reason))

    def release(self):
        print('release gate ({})'.format(self._reason))
        type(self)._sem.release()


async def spend(reason):
    sheep = Sheep(reason)
    gate = Gate(reason)
    await asyncio.gather(
        sheep.acquire(), 
        gate.acquire()
    )  # block 1 sheep, 1 gate
    await asyncio.sleep(1)  # 1 second
    print('Spend sheep through a gate')
    sheep.release()
    gate.release()


async def feed(reason):
    sheep = Sheep(reason)
    await  asyncio.gather(
        sheep.acquire()
    )  # block 1 sheep
    await asyncio.sleep(2)  # 2 seconds
    print('Feed sheep')
    sheep.release()


async def main():
    await asyncio.gather(
        spend('spend 1'),
        feed('feed 1'),
        spend('spend 2')
    )  # spend 2 times, feed 1 time


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

代码语言:javascript
复制
acquire gate (spend 1)
acquire sheep (spend 1)
acquire sheep (spend 2) <-----
Spend sheep through a gate
release sheep (spend 1)
release gate (spend 1)
acquire sheep (feed 1)
acquire gate (spend 2)
Spend sheep through a gate
release sheep (spend 2)
release gate (spend 2)
Feed sheep
release sheep (feed 1)
[Finished in 3.2s]

问题是程序不能以最优的方式工作,在输出的第3行中的原因是:spend 2阻塞绵羊,而不能立即使用它,它应该等待被spend 1阻塞的单门。第二,可以在spend 1浪费时间的情况下饲养的羊:

程序应该如何工作的最佳方式:spend 1阻塞1只羊和1只羊,spend 2看到那扇门被阻塞,没有理由立即阻止第二只羊。feed 1可以在spend 1运行时阻塞第二只羊并运行。在这种情况下,程序将在2秒内完成,而不是3秒:

很容易看出你是否改变了main的集合内的顺序。

资源不仅要并行不悖,而且要同步,只有羊和门是可用的,才能封锁羊和门。就像这样:

代码语言:javascript
复制
while sheep.locked() or gate.locked():
    asyncio.sleep(0)
await asyncio.gather(
    sheep.acquire(), 
    gate.acquire()
)

但这看起来不像是一个通用的、好的解决方案。是否存在解决这一问题的任何模式或更好的方法?任何想法都欢迎。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-12-02 09:11:34

基于这个解决方案,我为这个示例创建了解决方案。我们需要两件事:

  1. locked()函数添加到SheepGate中,即检查现在是否可以获取对象
  2. 添加并使用新的MultiAcquire任务,该任务将只获取对象,如果所有对象都可以立即获得,则只获取(否则暂停发布事件)

下面是最后的代码,请参阅MultiAcquire -它的主要内容是:

代码语言:javascript
复制
import asyncio


class Sheep:
    _sem = asyncio.Semaphore(2)  # we have 2 avaliable sheeps at time

    def __init__(self, reason):
        self._reason = reason

    async def acquire(self):
        await type(self)._sem.acquire()
        print('acquire sheep ({})'.format(self._reason))

    def release(self):
        print('release sheep ({})'.format(self._reason))
        type(self)._sem.release()

    def locked(self):
        return type(self)._sem.locked()


class Gate:
    _sem = asyncio.Semaphore(1)  # we have 1 avaliable gate at time

    def __init__(self, reason):
        self._reason = reason

    async def acquire(self):
        await type(self)._sem.acquire()
        print('acquire gate ({})'.format(self._reason))

    def release(self):
        print('release gate ({})'.format(self._reason))
        type(self)._sem.release()

    def locked(self):
        return type(self)._sem.locked()


class MultiAcquire(asyncio.Task):
    _check_lock = asyncio.Lock()  # to suspend for creating task that acquires objects
    _release_event = asyncio.Event()  # to suspend for any object was released

    def __init__(self, locks):
        super().__init__(self._task_coro())
        self._locks = locks
        # Here we use decorator to subscribe all release() calls,
        # _release_event would be set in this case:
        for l in self._locks:
            l.release = self._notify(l.release)

    async def _task_coro(self):
        while True:
            # Create task to acquire all locks and break on success:
            async with type(self)._check_lock:
                if not any(l.locked() for l in self._locks):  # task would be created only if all objects can be acquired
                    task = asyncio.gather(*[l.acquire() for l in self._locks])  # create task to acquire all objects 
                    await asyncio.sleep(0)  # start task without waiting for it
                    break
            # Wait for any release() to try again:
            await type(self)._release_event.wait()
        # Wait for task:
        return await task

    def _notify(self, func):
        def wrapper(*args, **kwargs):
            type(self)._release_event.set()
            type(self)._release_event.clear()
            return func(*args, **kwargs)
        return wrapper


async def spend(reason):
    sheep = Sheep(reason)
    gate = Gate(reason)
    await MultiAcquire([sheep, gate])  # block 1 sheep, 1 gate
    await asyncio.sleep(1)  # 1 second
    print('Spend sheep through a gate')
    sheep.release()
    gate.release()


async def feed(reason):
    sheep = Sheep(reason)
    await MultiAcquire([sheep])  # block 1 sheep
    await asyncio.sleep(2)  # 2 seconds
    print('Feed sheep')
    sheep.release()


async def main():
    await asyncio.gather(
        spend('spend 1'),
        feed('feed 1'),
        spend('spend 2')
    )  # spend 2 times, feed 1 time


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

代码语言:javascript
复制
acquire gate (spend 2)
acquire sheep (spend 2)
acquire sheep (feed 1)
Spend sheep through a gate
release sheep (spend 2)
release gate (spend 2)
acquire sheep (spend 1)
acquire gate (spend 1)
Feed sheep
release sheep (feed 1)
Spend sheep through a gate
release sheep (spend 1)
release gate (spend 1)
[Finished in 2.2s]
票数 1
EN

Stack Overflow用户

发布于 2015-11-30 12:58:48

您可以实现一个处理多个锁的异步上下文管理器。此对象应确保在等待另一个不可用的锁时不会持有任何锁:

代码语言:javascript
复制
class multilock(asyncio.locks._ContextManagerMixin):

    def __init__(self, *locks):
        self.released = list(locks)
        self.acquired = []

    async def acquire(self):
        while self.released:
            lock = self.released.pop()
            if lock.locked():
                self.release()
            await lock.acquire()
            self.acquired.append(lock)

    def release(self):
        while self.acquired:
            lock = self.acquired.pop()
            lock.release()
            self.released.append(lock)

示例:

代码语言:javascript
复制
async def test(lock1, lock2):
    async with multilock(lock1, lock2):
        print('Do something')
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/33969075

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档