我有一个leetle测试程序,它不工作,除非我做了一个client.flush()之前的client.close()。对nats/aio/client.py的天真解读似乎表明close()做了某种flush(),所以我不明白为什么我的测试程序会失败。是否有必要总是在flush()之前使用close()?示例程序似乎没有表明这一点。
我可以看到close()调用_flush_pending(),但这与flush()有很大的不同
flush()调用_send_ping()发送PING并等待PONG响应。_send_ping()直接写入self._io_writer,然后调用_flush_pending()。_flush_pending()将一个None (我想任何事情都可以)推入self._flush_queue。这可能会唤醒_flusher()并使其将self._pending中的所有内容写入self._io_writer。publish()调用_send_command()将消息推送到self._pending上,然后调用_flush_pending()使_flusher()编写所有内容。测试程序:
#!/usr/bin/env python3.5
import asyncio
import nats.aio.client
import nats.aio.errors
async def send_message(loop):
mq_url = "nats://nats:password@127.0.0.1:4222"
client = nats.aio.client.Client()
await client.connect(io_loop=loop, servers=[mq_url])
await client.publish("test_subject", "test1".encode())
#await client.flush()
await client.close()
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(send_message(loop))
loop.close()
if __name__ == '__main__':
main()FWIW如果我发送了大量的消息,那么,在某个时候(还没有确定确切的条件),就会发送消息。我们注意到了在处理文件集合并为文件中的每一行发送消息时的行为:一些文件正在通过,而另一些文件则没有,结果是更大的文件(更多的行)形成了它。所以看起来好像有一些内部缓冲区被填满了,这就强制进行了刷新。
发布于 2017-05-05 02:58:18
看起来可能是刚刚修复的bug:https://github.com/nats-io/asyncio-nats/pull/35
https://stackoverflow.com/questions/43749659
复制相似问题