首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Python3.9监督狗,启动每个事件的进程

Python3.9监督狗,启动每个事件的进程
EN

Stack Overflow用户
提问于 2021-09-21 12:57:51
回答 1查看 373关注 0票数 0

当使用pythons看门狗模块创建文件时,我编写了一些python类来处理这些文件。每当触发新文件事件时,应启动一个进程以处理该文件的处理。同时有来自不同来源的传入文件,所以我希望多个进程“同时”处理一些文件。

根据syslog,只创建了一个进程,因此没有多处理。我只看到一条线

代码语言:javascript
复制
Sep 21 13:53:02 host1 python3.9[6704] : ...

在日志中具有完全相同的PID (在本例中为6704)。

有人能告诉我我在这里做错了什么吗?

fileMonitor.py含量

代码语言:javascript
复制
import time
from watchdog.observers.polling import PollingObserver
from watchdog.events import RegexMatchingEventHandler

import converter

##
## \brief Class to handle monitored events
##
class LogFileEventHandler(RegexMatchingEventHandler):

    MONITOR_REGEX = [r'.*\.(gz|txt)$']  # watch out for new "*.gz" or "*.txt"-files only
    IGNORE_REGEX = [r'.*/archive/*']    # ignore events below path "*/archive/*"

    ###
    ### Public methods
    ###

    def __init__(self):
        super().__init__(
            regexes=self.MONITOR_REGEX,
            ignore_regexes=self.IGNORE_REGEX,
            ignore_directories=True,
            case_sensitive=False)
        self.cm = converter.ConverterManager()

    def on_created(self, event):
        self.cm.convertMemory(event.src_path)

    def on_moved(self, event):
        self.cm.convertMemory(event.dest_path)

##
## \brief Class to monitor changes in filesystem
## \note Has to use PollingObserver due to network-filesystem.
##       There is no OS-API supporting notification on network-filesystem changes.
##
class LogFileMonitor:

    ###
    ### Public methods
    ###

    def __init__(self, monitorPath):
        self.monitorPath = monitorPath                     # Path to monitor
        self.handler = LogFileEventHandler()               # Handler for events occurred
        self.observer = PollingObserver()                  # Method for monitoring

    def run(self):
        self._start()                                      # Prepare observer
        try:
            while True:
                time.sleep(1)                              # Suspend for some time
        except KeyboardInterrupt:
            self._stop()                                   # Terminate observer

    ###
    ### Private methods
    ###

    def _start(self):
        self.observer.schedule(                            # prepare observer
            event_handler=self.handler,
            path=self.monitorPath,
            recursive=True,
        )
        self.observer.start()                              # start observer

    def _stop(self):
        self.observer.stop()                               # stop observer
        self.observer.join()                               # wait till observer terminated

converter.py含量

代码语言:javascript
复制
import time
import concurrent.futures

##
## \brief Class to analyze logger data in memory and write it to influxdb
##
class ConverterMemoryWorker:

    ###
    ### Public methods
    ###

    def __init__(self, logFile):
        self.logFile = logFile

    def run(self):
        time.sleep(30) # Do some long taking stuff here
    
##
## \brief Class to manage converter workers
##
class ConverterManager:

    ###
    ### Public methods
    ###

    def __init__(self):
        print('Created new instance of ConverterManager')

    def convertMemory(self, logFile):
        with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: # create a new process from pool
            executor.submit(self._task(logFile))                                # start worker-process

    ###
    ### Private methods
    ###

    def _task(self, logFile):
        converterWorker = ConverterMemoryWorker(logFile)
        converterWorker.run()

解决了!

谢谢你们的暗示,他们几乎解决了我的问题。实际上,我还需要声明"_task"-method静态,以使其最终正常工作。

下面是对我有用的修改代码:

代码语言:javascript
复制
import time
import concurrent.futures

##
## \brief Class to analyze logger data in memory and write it to influxdb
##
class ConverterMemoryWorker:

    ###
    ### Public methods
    ###

    def __init__(self, logFile):
        self.logFile = logFile

    def run(self):
        print(f'Started process for {self.logFile}')
        time.sleep(10) # Do some long taking stuff here
        print(f'Terminated process for {self.logFile}')

##
## \brief Class to manage converter workers
##
class ConverterManager:

    executor = None

    ###
    ### Public methods
    ###

    def __init__(self):
        print('Created new instance of ConverterManager')
        self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)

    def convertMemory(self, logFile):
        self.executor.submit(self._task, logFile)                                # start worker-process

    ###
    ### Private methods
    ###
    @staticmethod
    def _task(logFile):
        converterWorker = ConverterMemoryWorker(logFile)
        converterWorker.run()


if __name__ == '__main__':
    cm = ConverterManager()

    for i in range(30):
        cm.convertMemory(i)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-09-21 14:31:25

代码语言:javascript
复制
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
    executor.submit(self._task(logFile))                               

这一声明大致相当于:

代码语言:javascript
复制
executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
try:
    executor.submit(self._task(logFile))
finally:
    executor.shutdown()

这在concurrent.futures.Executor.shutdown文档中有描述。因为每个执行器只有一个任务提交给它,然后它被立即关闭(导致调用者阻塞直到它完成它的工作),所以任务没有机会同时进行。

另一种方法是创建一个由所有任务共享的执行器,并在程序的末尾调用shutdown

当您这样做时,请与此分开:

代码语言:javascript
复制
executor.submit(self._task(logFile))

.相当于:

代码语言:javascript
复制
result = self._task(logFile)   # result becomes None
executor.submit(None)

所以它实际上是立即完成这项工作,甚至没有把它提交给执行者。

相反,您可能想这样做:

代码语言:javascript
复制
executor.submit(self._task, logFile)

...as在concurrent.futures.Executor.submit文档中描述。传递它--您希望调用子进程的函数,以及所有希望调用它的参数,但重要的是,不要自己调用它。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69269315

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档