首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Python Pickle流

Python Pickle流
EN

Stack Overflow用户
提问于 2021-11-02 12:42:28
回答 1查看 131关注 0票数 0

我想弄清楚如何以一种流畅的方式编写这段代码:

代码语言:javascript
复制
data = pickle.dumps(obj)
fp = io.BytesIO(data)

通常可以调用pickle.dump,但这需要提供编写器文件作为参数。相反,我有一个"upload_from_file“函数,所以我希望将文件指针传递给该函数,因此是BytesIO代码。

这种方式的问题是,我在内存中复制了数据,我更希望它是可流式的。

EN

回答 1

Stack Overflow用户

发布于 2021-11-02 13:03:46

方法1:一个线程和一个Python FIFO

正如我在评论中提到的,你需要一个线程和一个FIFO,它会变得很麻烦。(您可能需要在FIFO中实现额外的方法,这取决于您的实际upload_from_file函数的需求)。

代码语言:javascript
复制
import os
import queue
import pickle
import threading
from typing import Union


class FIFOStream:
    def __init__(self, maxsize=0):
        self.queue = queue.Queue(maxsize)

    def write(self, chunk: Union[bytes, None]):
        if chunk:
            print(f"Queued {len(chunk)} bytes")
        self.queue.put(chunk)

    def read(self):
        chunk = self.queue.get(True)
        if chunk is None:  # EOF marker encountered
            raise EOFError()
        return chunk


def do_pickling(fifo: FIFOStream, obj):
    pickle.dump(obj, fifo, protocol=pickle.HIGHEST_PROTOCOL)
    fifo.write(None)  # write EOF marker after Pickle is done


def upload_from_file(fifo):
    n = 0
    while True:
        try:
            chunk = fifo.read()
        except EOFError:
            break
        n += len(chunk)
        print(f"Uploading chunk of size {len(chunk)}")
    print(f"Finished uploading {n} bytes!")


def main():
    data = {a: os.urandom(1024) for a in range(500)}
    fifo = FIFOStream(maxsize=3)  # adjust maxsize to something larger in real use :)
    dumper = threading.Thread(target=do_pickling, args=(fifo, data))
    dumper.start()
    upload_from_file(fifo)
    dumper.join()


if __name__ == "__main__":
    main()

这将打印出如下内容

代码语言:javascript
复制
Queued 66062 bytes
Queued 66057 bytes
Uploading chunk of size 66062
Queued 66057 bytes
Uploading chunk of size 66057
Queued 66057 bytes
Queued 66121 bytes
Queued 66121 bytes
Uploading chunk of size 66057
Queued 66121 bytes
Uploading chunk of size 66057
Uploading chunk of size 66121
Uploading chunk of size 66121
Queued 53727 bytes
Uploading chunk of size 66121
Uploading chunk of size 53727
Finished uploading 516323 bytes!

方法2:线程和系统FIFO

正如ShadowRanger在注释中指出的,您也可以使用os.pipe()完成此操作,它为您提供了一个由系统管理的FIFOesque文件:

代码语言:javascript
复制
import os
import pickle
import threading
from typing import IO


def upload_from_file(io: IO[bytes]):
    n = 0
    while True:
        try:
            chunk = io.read(65536)
        except EOFError:
            break
        if not chunk:  # Empty chunk = EOF
            break
        n += len(chunk)
        print(f"Uploading chunk of size {len(chunk)}")
    print(f"Finished uploading {n} bytes!")


def dump_and_close(data, file: IO[bytes]):
    pickle.dump(data, file, protocol=pickle.HIGHEST_PROTOCOL)
    file.close()  # so the reader end will be EOF


def main():
    data = {a: os.urandom(1024) for a in range(500)}
    r_fd, w_fd = os.pipe()
    with open(r_fd, "rb") as r_file, open(w_fd, "wb") as w_file:
        dumper = threading.Thread(target=dump_and_close, args=(data, w_file))
        dumper.start()
        upload_from_file(r_file)
        dumper.join()


if __name__ == "__main__":
    main()

不过,输出有点乏味。;-)

代码语言:javascript
复制
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 57571
Finished uploading 516323 bytes!
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69810658

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档