我正在使用Python3.6中一个启用异步的Kinesis模块来部署到AWS (所以我需要这个模块兼容3.6 )。
我的用例是懒散地从磁盘中读取一个文件(大约100 My压缩- 1GB未压缩),并将数据(每次500行)流到Kinesis。当我正在阅读下一批500行的时候,我希望Kinesis制作人开始把500张唱片推送给Kinesis。
我注意到的是,它一次以500行的方式读取整个文件,然后开始将数据推送给。这似乎是因为我没有打电话给await asyncio.sleep(1),但我也不知道我这样做是否正确。
def lambda_handler(event, context):
event_loop = asyncio.get_event_loop()
# Extract filename from event and download file from S3
event_loop.run_until_complete(process(filename))
pending = asyncio.Task.all_tasks()
event_loop.run_until_complete(asyncio.gather(*pending))
async def process(filename):
for chunk in read_lines(filename, MAX_RECORDS_IN_BATCH):
asyncio.ensure_future(write_kinesis(chunk)).add_done_callback(callback)
def callback(result):
print(str(result))
async def write_kinesis(records):
future = asyncio.ensure_future(producer.put_records(records=records))如果我将await asyncio.sleep(.1)添加到process(filename)函数的末尾,而不是像我想的那样做的话,当然,它实际上是阻塞了.1秒的主线程。
Q-这是一个诀窍吗?用asyncio.sleep阻止它只需要足够长的时间,让Kinesis将数据排除在外?睡眠越少,我在内存中保存的数据就越多,因为移动客户端没有那么多的时间将数据排除出去,但是它会运行得更快(达到某个点)?
Q-我这样做对吗?再次,我正在尝试读取500行,推送到kinesis (异步),读取另一个500,而Kinesis客户端正在工作,清洗和重复。
此外,在查看回调函数的print语句时,我注意到如果write_kinesis函数不返回任何内容,回调的print语句具有result=None,而如果write_kinesis函数返回write_kinesis,则回调的print语句具有result=<Task pending...11b35f18>()。
Q-我假设如果没有返回语句,就没有结果,但是为什么在状态仍然“挂起”的情况下调用回调函数?
编辑1:我忘了提到的,Kinesis已经启用异步。
发布于 2019-03-14 00:17:00
您只需使用等待调用异步函数即可。
async def process(filename):
for chunk in read_lines(filename, MAX_RECORDS_IN_BATCH):
await producer.put_records(records=chunk)
async def run():
# Extract filename from event and download file from S3
await process(filename)
loop = asyncio.get_event_loop()
loop.run_until_complete( run() )
loop.close()https://stackoverflow.com/questions/55150171
复制相似问题