我想弄清楚如何以一种流畅的方式编写这段代码:
data = pickle.dumps(obj)
fp = io.BytesIO(data)通常可以调用pickle.dump,但这需要提供编写器文件作为参数。相反,我有一个"upload_from_file“函数,所以我希望将文件指针传递给该函数,因此是BytesIO代码。
这种方式的问题是,我在内存中复制了数据,我更希望它是可流式的。
发布于 2021-11-02 13:03:46
方法1:一个线程和一个Python FIFO
正如我在评论中提到的,你需要一个线程和一个FIFO,它会变得很麻烦。(您可能需要在FIFO中实现额外的方法,这取决于您的实际upload_from_file函数的需求)。
import os
import queue
import pickle
import threading
from typing import Union
class FIFOStream:
def __init__(self, maxsize=0):
self.queue = queue.Queue(maxsize)
def write(self, chunk: Union[bytes, None]):
if chunk:
print(f"Queued {len(chunk)} bytes")
self.queue.put(chunk)
def read(self):
chunk = self.queue.get(True)
if chunk is None: # EOF marker encountered
raise EOFError()
return chunk
def do_pickling(fifo: FIFOStream, obj):
pickle.dump(obj, fifo, protocol=pickle.HIGHEST_PROTOCOL)
fifo.write(None) # write EOF marker after Pickle is done
def upload_from_file(fifo):
n = 0
while True:
try:
chunk = fifo.read()
except EOFError:
break
n += len(chunk)
print(f"Uploading chunk of size {len(chunk)}")
print(f"Finished uploading {n} bytes!")
def main():
data = {a: os.urandom(1024) for a in range(500)}
fifo = FIFOStream(maxsize=3) # adjust maxsize to something larger in real use :)
dumper = threading.Thread(target=do_pickling, args=(fifo, data))
dumper.start()
upload_from_file(fifo)
dumper.join()
if __name__ == "__main__":
main()这将打印出如下内容
Queued 66062 bytes
Queued 66057 bytes
Uploading chunk of size 66062
Queued 66057 bytes
Uploading chunk of size 66057
Queued 66057 bytes
Queued 66121 bytes
Queued 66121 bytes
Uploading chunk of size 66057
Queued 66121 bytes
Uploading chunk of size 66057
Uploading chunk of size 66121
Uploading chunk of size 66121
Queued 53727 bytes
Uploading chunk of size 66121
Uploading chunk of size 53727
Finished uploading 516323 bytes!方法2:线程和系统FIFO
正如ShadowRanger在注释中指出的,您也可以使用os.pipe()完成此操作,它为您提供了一个由系统管理的FIFOesque文件:
import os
import pickle
import threading
from typing import IO
def upload_from_file(io: IO[bytes]):
n = 0
while True:
try:
chunk = io.read(65536)
except EOFError:
break
if not chunk: # Empty chunk = EOF
break
n += len(chunk)
print(f"Uploading chunk of size {len(chunk)}")
print(f"Finished uploading {n} bytes!")
def dump_and_close(data, file: IO[bytes]):
pickle.dump(data, file, protocol=pickle.HIGHEST_PROTOCOL)
file.close() # so the reader end will be EOF
def main():
data = {a: os.urandom(1024) for a in range(500)}
r_fd, w_fd = os.pipe()
with open(r_fd, "rb") as r_file, open(w_fd, "wb") as w_file:
dumper = threading.Thread(target=dump_and_close, args=(data, w_file))
dumper.start()
upload_from_file(r_file)
dumper.join()
if __name__ == "__main__":
main()不过,输出有点乏味。;-)
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 57571
Finished uploading 516323 bytes!https://stackoverflow.com/questions/69810658
复制相似问题