首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Urwid和多重处理

Urwid和多重处理
EN

Stack Overflow用户
提问于 2022-10-13 08:40:29
回答 2查看 39关注 0票数 1

我尝试在urwid中对一些操作进行排序,我制作了一个在后台运行的计时器,并与主要进程通信,如下所示:

代码语言:javascript
复制
from multiprocessing import Process, Pipe
import time
import urwid

def show_or_exit(key):
    if key in ('q', 'Q'):
        raise urwid.ExitMainLoop()

class midiloop(urwid.Frame):
    def __init__(self):
        self.message = urwid.Text('Press Space', align='center')
        self.filler = urwid.Filler(self.message, "middle")
        super().__init__(urwid.Frame(self.filler))
    
    def keypress(self, size, key):
        if key == " ":
            self.seq()
        else:
            return key

    def timer(self,conn):
        x = 0
        while True:
            if (conn.poll() == False):
                pass
            else:
                z = conn.recv()
                if (z == "kill"):
                    return()
            conn.send(x)
            x+=1
            time.sleep(0.05)
    
    def seq(self):
        self.parent_conn, self.child_conn = Pipe()
        self.p = Process(target=self.timer, args=(self.child_conn,))
        self.p.start()
        while True:
            if (self.parent_conn.poll(None)):
                self.y = self.parent_conn.recv()
                self.message.set_text(str(self.y))
                loop.draw_screen()
                if ( self.y > 100 ):
                    self.parent_conn.send("kill")
                    self.message.set_text("Press Space")
                    return()

if __name__ == '__main__':
    midiloop = midiloop()
    loop = urwid.MainLoop(midiloop, unhandled_input=show_or_exit, handle_mouse=True)
    loop.run()

问题是,我用while阻塞urwid主循环:所以任何人都可以给我一个解决方案,在程序到达循环结束之前,听q键退出程序,更广泛地说,它可以与urwid交互,并与子进程通信。

EN

回答 2

Stack Overflow用户

发布于 2022-10-17 21:10:52

将多处理和urwid结合起来似乎相当复杂。

由于您使用的是计时器,而且您的类名为midiloop,所以我猜您可能想要实现一个迷你排序器。

一种可能的实现方法是使用异步循环而不是urwid的MainLoop,并使用loop.call_later()函数来调度事件。过去,我用这种方法实现了一台简单的鼓机,使用乌维德绘制排序器,使用asyncio调度播放事件,使用简单音频播放。您可以在这里看到代码:https://github.com/eliasdorneles/kickit

如果您仍然希望实现多进程通信,我认为最好的方法是使用urwid.AsyncioEventLoop辅助对象进行双工通信。

票数 0
EN

Stack Overflow用户

发布于 2022-11-25 20:09:32

恐怕不是很小。然而,我确实花了一天时间编写这个Urwid前端,它启动、停止并与子流程进行交流。

代码语言:javascript
复制
import os
import sys
from multiprocessing import Process, Pipe, Event
from collections import deque
import urwid


class suppress_stdout_stderr(object):
    """
    Supresses the stdout and stderr by piping them to dev null...
    The same place I send bad faith replies to my tweets
    """
    def __enter__(self):
        self.outnull_file = open(os.devnull, 'w')
        self.errnull_file = open(os.devnull, 'w')

        self.old_stdout_fileno_undup = sys.stdout.fileno()
        self.old_stderr_fileno_undup = sys.stderr.fileno()

        self.old_stdout_fileno = os.dup(sys.stdout.fileno())
        self.old_stderr_fileno = os.dup(sys.stderr.fileno())

        self.old_stdout = sys.stdout
        self.old_stderr = sys.stderr

        os.dup2(self.outnull_file.fileno(), self.old_stdout_fileno_undup)
        os.dup2(self.errnull_file.fileno(), self.old_stderr_fileno_undup)

        sys.stdout = self.outnull_file
        sys.stderr = self.errnull_file
        return self

    def __exit__(self, *_):
        sys.stdout = self.old_stdout
        sys.stderr = self.old_stderr

        os.dup2(self.old_stdout_fileno, self.old_stdout_fileno_undup)
        os.dup2(self.old_stderr_fileno, self.old_stderr_fileno_undup)

        os.close(self.old_stdout_fileno)
        os.close(self.old_stderr_fileno)

        self.outnull_file.close()
        self.errnull_file.close()


def subprocess_main(transmit, stop_process):
    with suppress_stdout_stderr():
        import time

        yup = ['yuuuup', 'yuuuuup', 'yeaup', 'yeoop']
        nope = ['noooooooe', 'noooope', 'nope', 'nope']
        mesg = 0
        i = 0

        while True:
            i = i % len(yup)
            if transmit.poll():
                mesg = transmit.recv()
            if mesg == 'Yup':
                transmit.send(yup[i])
            if mesg == 'Nope':
                transmit.send(nope[i])
            if stop_process.wait(0):
                break
            i += 1
            time.sleep(2)


class SubProcess:
    def __init__(self, main):
        """
        Handles forking, stopping and communication with a subprocess
        :param main: subprocess method to run method signature is

            def main(transmit, stop_process):
                transmit: is a multiprocess Pipe to send data to parent process
                stop_process: is multiprocess Event to set when you want the process to exit
        """
        self.main = main
        self.recv, self.transmit = None, None
        self.stop_process = None
        self.proc = None

    def fork(self):
        """
        Forks and starts the subprocess
        """
        self.recv, self.transmit = Pipe(duplex=True)
        self.stop_process = Event()
        self.proc = Process(target=self.main, args=(self.transmit, self.stop_process))
        self.proc.start()

    def write_pipe(self, item):
        self.recv.send(item)

    def read_pipe(self):
        """
        Reads data sent by the process into a list and returns it
        :return:
        """
        item = []
        if self.recv is not None:
            try:
                while self.recv.poll():
                    item += [self.recv.recv()]
            except:
                pass
        return item

    def stop(self):
        """
        Sets the event to tell the process to exit.
        note: this is co-operative multi-tasking, the process must respect the flag or this won't work!
        """
        self.stop_process.set()
        self.proc.join()


class UrwidFrontend:
    def __init__(self, subprocess_main):
        """
        Urwid frontend to control the subprocess and display it's output
        """
        self.title = 'Urwid Frontend Demo'
        self.choices = 'Start Subprocess|Quit'.split('|')
        self.response = None
        self.item = deque(maxlen=10)
        self.event_loop = urwid.SelectEventLoop()

        # start the heartbeat
        self.event_loop.alarm(0, self.heartbeat)
        self.main = urwid.Padding(self.main_menu(), left=2, right=2)
        self.top = urwid.Overlay(self.main, urwid.SolidFill(u'\N{MEDIUM SHADE}'),
                                 align='center', width=('relative', 60),
                                 valign='middle', height=('relative', 60),
                                 min_width=20, min_height=9)

        self.loop = urwid.MainLoop(self.top, palette=[('reversed', 'standout', ''), ], event_loop=self.event_loop)

        self.subprocess = SubProcess(subprocess_main)

    def exit_program(self, button):
        raise urwid.ExitMainLoop()

    def main_menu(self):
        body = [urwid.Text(self.title), urwid.Divider()]
        for c in self.choices:
            button = urwid.Button(c)
            urwid.connect_signal(button, 'click', self.handle_button, c)
            body.append(urwid.AttrMap(button, None, focus_map='reversed'))
        return urwid.ListBox(urwid.SimpleFocusListWalker(body))

    def subproc_menu(self):
        self.response = urwid.Text('Waiting ...')
        body = [self.response, urwid.Divider()]
        choices = ['Yup', 'Nope', 'Stop Subprocess']
        for c in choices:
            button = urwid.Button(c)
            urwid.connect_signal(button, 'click', self.handle_button, c)
            body.append(urwid.AttrMap(button, None, focus_map='reversed'))
        listbox = urwid.ListBox(urwid.SimpleFocusListWalker(body))
        return listbox

    def update_subproc_menu(self, text):
        self.response.set_text(text)

    def handle_button(self, button, choice):
        if choice == 'Start Subprocess':
            self.main.original_widget = self.subproc_menu()
            self.subprocess.fork()
            self.item = deque(maxlen=10)

        if choice == 'Stop Subprocess':
            self.subprocess.stop()
            self.main.original_widget = self.main_menu()

        if choice == 'Quit':
            self.exit_program(button)

        if choice == 'Yup':
            self.subprocess.write_pipe('Yup')

        if choice == 'Nope':
            self.subprocess.write_pipe('Nope')

    def heartbeat(self):
        """
        heartbeat that runs 24 times per second
        """

        # read from the process
        self.item.append(self.subprocess.read_pipe())

        # display it
        if self.response is not None:
            self.update_subproc_menu(['Subprocess started\n', f'{self.item}\n', ])
            self.loop.draw_screen()

        # set the next beat
        self.event_loop.alarm(1 / 24, self.heartbeat)

    def run(self):
        self.loop.run()


if __name__ == "__main__":

    app = UrwidFrontend(subprocess_main)
    app.run()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74052836

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档