注:这个问题与跃迁的FSM库有关
我正在寻找一种方法,在中将它们作为列表来依次解决方法回调,在之前准备或/和,或者在之后准备。我正在使用来自AsyncMachine的transitions.extensions.asyncio模块
预期结果:
1Done_2Done_3Done
Getting:
None_3Done
复制当前情况的示例代码:
import asyncio
from transitions.extensions.asyncio import AsyncMachine
class Model:
STATES = ['A', 'B']
TRANSITIONS = [
{'trigger': 'next', 'source': 'A', 'dest': 'B',
'prepare': ['initialize1', 'initialize2', 'initialize3'], 'before': [], 'after': ['show_attributes']}
]
def __init__(self, name, state='initial'):
self.name = name
self.state = state
self.attribute_1 = None
self.attribute_2 = None
self.attribute_3 = None
async def initialize1(self):
await asyncio.sleep(1) # This is expensive operation and will take some time.
self.attribute_1 = '1Done'
print(f'{self.name} {self.state} -> Initialized1: ', self.attribute_1)
async def initialize2(self):
await asyncio.sleep(0.5) # This is expensive operation and will take some time.
self.attribute_2 = f'{self.attribute_1}_2Done'
print(f'{self.name} {self.state} -> Initialized2: ', self.attribute_2)
async def initialize3(self):
self.attribute_3 = f'{self.attribute_2}_3Done'
print(f'{self.name} {self.state} -> Initialized3: ', self.attribute_3)
async def show_attributes(self):
print(f'{self.name} {self.state} -> Showing all: {self.attribute_3}')
machine = AsyncMachine(
model=None,
states=Model.STATES,
transitions=Model.TRANSITIONS,
initial=None,
queued='model'
# queued=True
)
async def main():
model1 = Model(name='Model1', state='A')
machine.add_model(model1, initial=model1.state)
await machine.dispatch('next')
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())正如代码'prepare': ['initialize1', 'initialize2', 'initialize3']中所示,我正在寻找一种方法来调用initialize2 (一旦initialize1被解析)和initialize3 (一旦同时解析了initialize1和initialize2方法)。目前,它们被称为并行(),这是一个很好的特性,但是如果有一种方法可以让它们按顺序执行/解析,那就太棒了。
当然,我可以再添加一个像initialize_all这样的方法,然后在其中调用所有上述方法。但是想一想,为了处理现实世界的问题,我需要不断增加多少新的方法。我想使我的功能可重用和更小,只是为了一个特定的任务。
发布于 2021-08-02 12:20:05
我觉得你的“方法2”看起来不错。如果您知道所有回调都应该按顺序执行,并且根本不需要并行执行,则还可以使用以下方法覆盖await_all:
class EnhancedMachine(AsyncMachine):
@staticmethod
async def await_all(callables):
return [await func() for func in callables]如果您切换元组/列表的含义,您可以将代码缩短到如下所示:
class EnhancedMachine(AsyncMachine):
async def callbacks(self, func_groups, event_data):
results = []
for funcs in func_groups:
if isinstance(funcs, (list, tuple)):
results.extend(await self.await_all(
[partial(event_data.machine.callback, func, event_data)
for func in funcs]
))
else:
results.append(await self.callback(funcs, event_data))
return results这就启用了回调注释,比如[stage_1, (stage_2a, stage_2b, stage_2c), stage_3],其中每个阶段都是顺序执行的,但是子阶段是并行调用的。
https://stackoverflow.com/questions/68442841
复制相似问题