首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RPyC:无法保存图像文件

RPyC:无法保存图像文件
EN

Stack Overflow用户
提问于 2020-02-13 13:06:48
回答 1查看 171关注 0票数 0

我正在开发使用平板扫描仪的Django应用程序。为了保持与扫描仪的连接,使用了RPyC。(连接到扫描仪需要很长时间,所以我想多次重复使用该连接。)

连接到扫描仪并保持服务运行似乎很好。我可以打电话给扫描仪,它马上就会有反应。但是我无法保存PIL图像文件,请参阅下面的错误。如果没有RPyC,就能够保存PIL图像文件。

代码语言:javascript
复制
Traceback (most recent call last):
  File "/usr/lib/python3.6/code.py", line 91, in runcode
    exec(code, self.locals)
  File "<console>", line 1, in <module>
  File "/home/user/Projects/django-project/project/appname/models.py", line 55, in scan_preview
    image_file.save(file_path)
  File "/home/user/Envs/project/lib/python3.6/site-packages/rpyc/core/netref.py", line 166, in __getattr__
    return syncreq(self, consts.HANDLE_GETATTR, name)
  File "/home/user/Envs/project/lib/python3.6/site-packages/rpyc/core/netref.py", line 76, in syncreq
    return conn.sync_request(handler, proxy, *args)
  File "/home/user/Envs/project/lib/python3.6/site-packages/rpyc/core/protocol.py", line 469, in sync_request
    return self.async_request(handler, *args, timeout=timeout).value
  File "/home/user/Envs/project/lib/python3.6/site-packages/rpyc/core/async_.py", line 102, in value
    raise self._obj
AttributeError: cannot access 'save'

看起来RPyC想要保存这个图像,我不明白为什么。在保存映像之前关闭连接将导致有关流正在关闭的错误。

我想要的是RPyC在返回PIL映像文件后停止干扰我。,我希望它返回PIL映像,而不是流mambo巨无霸。(对于RPyC,我不知道正确的术语。)

scanner_service.py

代码语言:javascript
复制
import rpyc
import sane


class Scanner(object):
    """
    The Scanner class is used to interact with flatbed scanners.
    """

    def __init__(self):
        sane.init()
        self.device_name = None
        self.error_message = None
        self.device = self.get_device()

    def get_device(self):
        """
        Return the first detected scanner and set the name.

        @return: sane.SaneDev
        """
        devices = sane.get_devices()
        print('Available devices:', devices)

        # Empty list means no scanner is connect or the connected scanner is
        # already being used
        if not devices:
            self.error_message = 'Scanner disconnect or already being used.'
            return None

        # open the first scanner you see
        try:
            device = sane.open(devices[0][0])
        except Exception as e:
            self.error_message = e
            print(e)
            return None

        brand = devices[0][1]
        model = devices[0][2]
        self.device_name = "{brand}_{model}".format(
            brand=brand,
            model=model
        )

        print("Device name:", self.device_name)

        # set to color scanning mode, this is not always the default mode
        device.mode = 'color'

        return device

    def print_options(self):
        """Print the device's options. Useful for development."""
        for opt in self.device.get_options():
            print(opt, '\n')

    def scan_image(self, dpi):
        """
        Scan an image.

        @param dpi: integer
        @return: PIL image
        """
        self.device.resolution = dpi
        self.device.start()
        image = self.device.snap()

        return image

    def scan_roi(
            self,
            top_left_x, top_left_y,
            bottom_right_x, bottom_right_y,
            dpi=2400
    ):
        """
        Scan a Region of Interest.

        The ROI is selected by giving two x/y coordinates, one for the top left
        and one for the bottom right. This creates a square that the scanner
        will scan.
        To create a ROI measure the amount of millimeters starting at the top
        left corner of the document.
        http://www.sane-project.org/html/doc014.html

        @param top_left_x: integer
        @param top_left_y: integer
        @param bottom_right_x: integer
        @param bottom_right_y: integer
        @param dpi: integer
        @return: PIL image
        """
        self.device.resolution = dpi

        # top left x/y
        self.device.tl_x = top_left_x
        self.device.tl_y = top_left_y

        # bottom right x/y
        self.device.br_x = bottom_right_x
        self.device.br_y = bottom_right_y

        self.device.start()
        image = self.device.snap()

        return image


class ScannerService(rpyc.Service):
    """
    Run the scanner as a service.

    Multiple clients can connect to the Scanner Service to use the flatbed
    scanner without having to wait for sane to setup a connection.
    """

    def __init__(self):
        self.scanner = Scanner()

    def on_connect(self, conn):
        pass

    def on_disconnect(self, conn):
        pass

    def exposed_get_device_name(self):
        """
        Return the name of the scanner

        @return: string
        """
        return self.scanner.device_name

    def exposed_scan_image(self, dpi):
        """
        Scan an image.

        @param dpi: integer
        @return: PIL image
        """
        image = self.scanner.scan_image(dpi)
        return image

    def exposed_scan_roi(
            self,
            top_left_x, top_left_y,
            bottom_right_x, bottom_right_y,
            dpi=2400
    ):
        """
        Scan a Region of Interest.

        @param top_left_x: integer
        @param top_left_y: integer
        @param bottom_right_x: integer
        @param bottom_right_y: integer
        @param dpi: integer
        @return: PIL image
       """
        image = self.scanner.scan_roi(
            top_left_x, top_left_y,
            bottom_right_x, bottom_right_y,
            dpi
        )
        return image


if __name__ == "__main__":
    from rpyc.utils.server import ThreadedServer
    t = ThreadedServer(ScannerService(), port=18861)
    t.start()

utils.py

代码语言:javascript
复制
class ScannerServiceConnection(object):
    """Connect to and interact with the scanner service."""

    def __init__(self):
        self.connection = self.get_connection()

    @staticmethod
    def get_connection():
        """
        Connect to the scanner service

        @return: ScannerService object
        """
        connection = rpyc.connect("localhost", 18861)
        return connection

    def get_device_name(self):
        """
        Return the name of the scanner

        @return: string
        """
        return self.connection.root.exposed_get_device_name()

    def scan_image(self, dpi):
        """
        Scan an image.

        @param dpi: integer
        @return: PIL image
        """
        image = self.connection.exposed_scan_image(dpi)
        return image

    def scan_roi(
            self,
            top_left_x, top_left_y,
            bottom_right_x, bottom_right_y,
            dpi=2400
    ):
        """
        Scan a Region of Interest.

        @param top_left_x: integer
        @param top_left_y: integer
        @param bottom_right_x: integer
        @param bottom_right_y: integer
        @param dpi: integer
        @return: PIL image
       """
        image = self.connection.root.exposed_scan_roi(
            top_left_x, top_left_y,
            bottom_right_x, bottom_right_y,
            dpi
        )
        return image
EN

回答 1

Stack Overflow用户

发布于 2020-02-13 14:47:11

我找到了我问题的答案,耶!

我所要做的就是将访问策略更改为以下内容。现在可以访问PIL映像上的保存方法。

我不知道这是不是正确的方法。如果没有,请告诉我。

代码语言:javascript
复制
if __name__ == "__main__":
    from rpyc.utils.server import ThreadedServer
    t = ThreadedServer(
        ScannerService(), port=18861,
        protocol_config={'allow_public_attrs': True}
    )
    t.start()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60208600

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档