当使用pythons看门狗模块创建文件时,我编写了一些python类来处理这些文件。每当触发新文件事件时,应启动一个进程以处理该文件的处理。同时有来自不同来源的传入文件,所以我希望多个进程“同时”处理一些文件。
根据syslog,只创建了一个进程,因此没有多处理。我只看到一条线
Sep 21 13:53:02 host1 python3.9[6704] : ...在日志中具有完全相同的PID (在本例中为6704)。
有人能告诉我我在这里做错了什么吗?
fileMonitor.py含量
import time
from watchdog.observers.polling import PollingObserver
from watchdog.events import RegexMatchingEventHandler
import converter
##
## \brief Class to handle monitored events
##
class LogFileEventHandler(RegexMatchingEventHandler):
MONITOR_REGEX = [r'.*\.(gz|txt)$'] # watch out for new "*.gz" or "*.txt"-files only
IGNORE_REGEX = [r'.*/archive/*'] # ignore events below path "*/archive/*"
###
### Public methods
###
def __init__(self):
super().__init__(
regexes=self.MONITOR_REGEX,
ignore_regexes=self.IGNORE_REGEX,
ignore_directories=True,
case_sensitive=False)
self.cm = converter.ConverterManager()
def on_created(self, event):
self.cm.convertMemory(event.src_path)
def on_moved(self, event):
self.cm.convertMemory(event.dest_path)
##
## \brief Class to monitor changes in filesystem
## \note Has to use PollingObserver due to network-filesystem.
## There is no OS-API supporting notification on network-filesystem changes.
##
class LogFileMonitor:
###
### Public methods
###
def __init__(self, monitorPath):
self.monitorPath = monitorPath # Path to monitor
self.handler = LogFileEventHandler() # Handler for events occurred
self.observer = PollingObserver() # Method for monitoring
def run(self):
self._start() # Prepare observer
try:
while True:
time.sleep(1) # Suspend for some time
except KeyboardInterrupt:
self._stop() # Terminate observer
###
### Private methods
###
def _start(self):
self.observer.schedule( # prepare observer
event_handler=self.handler,
path=self.monitorPath,
recursive=True,
)
self.observer.start() # start observer
def _stop(self):
self.observer.stop() # stop observer
self.observer.join() # wait till observer terminatedconverter.py含量
import time
import concurrent.futures
##
## \brief Class to analyze logger data in memory and write it to influxdb
##
class ConverterMemoryWorker:
###
### Public methods
###
def __init__(self, logFile):
self.logFile = logFile
def run(self):
time.sleep(30) # Do some long taking stuff here
##
## \brief Class to manage converter workers
##
class ConverterManager:
###
### Public methods
###
def __init__(self):
print('Created new instance of ConverterManager')
def convertMemory(self, logFile):
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: # create a new process from pool
executor.submit(self._task(logFile)) # start worker-process
###
### Private methods
###
def _task(self, logFile):
converterWorker = ConverterMemoryWorker(logFile)
converterWorker.run()解决了!
谢谢你们的暗示,他们几乎解决了我的问题。实际上,我还需要声明"_task"-method静态,以使其最终正常工作。
下面是对我有用的修改代码:
import time
import concurrent.futures
##
## \brief Class to analyze logger data in memory and write it to influxdb
##
class ConverterMemoryWorker:
###
### Public methods
###
def __init__(self, logFile):
self.logFile = logFile
def run(self):
print(f'Started process for {self.logFile}')
time.sleep(10) # Do some long taking stuff here
print(f'Terminated process for {self.logFile}')
##
## \brief Class to manage converter workers
##
class ConverterManager:
executor = None
###
### Public methods
###
def __init__(self):
print('Created new instance of ConverterManager')
self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
def convertMemory(self, logFile):
self.executor.submit(self._task, logFile) # start worker-process
###
### Private methods
###
@staticmethod
def _task(logFile):
converterWorker = ConverterMemoryWorker(logFile)
converterWorker.run()
if __name__ == '__main__':
cm = ConverterManager()
for i in range(30):
cm.convertMemory(i)发布于 2021-09-21 14:31:25
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
executor.submit(self._task(logFile)) 这一声明大致相当于:
executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
try:
executor.submit(self._task(logFile))
finally:
executor.shutdown()这在concurrent.futures.Executor.shutdown文档中有描述。因为每个执行器只有一个任务提交给它,然后它被立即关闭(导致调用者阻塞直到它完成它的工作),所以任务没有机会同时进行。
另一种方法是创建一个由所有任务共享的执行器,并在程序的末尾调用shutdown。
当您这样做时,请与此分开:
executor.submit(self._task(logFile)).相当于:
result = self._task(logFile) # result becomes None
executor.submit(None)所以它实际上是立即完成这项工作,甚至没有把它提交给执行者。
相反,您可能想这样做:
executor.submit(self._task, logFile)...as在concurrent.futures.Executor.submit文档中描述。传递它--您希望调用子进程的函数,以及所有希望调用它的参数,但重要的是,不要自己调用它。
https://stackoverflow.com/questions/69269315
复制相似问题