首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >多处理功能,不写入文件或打印

多处理功能,不写入文件或打印
EN

Stack Overflow用户
提问于 2018-12-30 16:23:07
回答 1查看 359关注 0票数 1

我正在开发一个Raspberry (3 B+),制作一个数据收集设备,并试图生成一个进程来记录输入的数据并将其写入文件中。我有一个函数用于写作,当我直接调用它时,它工作得很好。

然而,当我使用多进程方法调用它时,似乎什么都没有发生。我可以在Linux中的任务监视器中看到,进程实际上是生成的,但是没有文件被写入,当我试图向它传递一个标志来关闭它时,它不起作用,这意味着我最终终止了进程,似乎什么事情都没有发生。

我经历过很多次,却看不出我做错了什么,还有人吗?如果是相关的话,这些都是父类中的函数,其中一个函数是作为线程生成另一个函数的。

我正在使用的代码:

代码语言:javascript
复制
from datetime import datetime, timedelta
import csv
from drivers.IMU_SEN0 import IMU_SEN0
import multiprocessing, os

class IMU_data_logger:
    _output_filename = ''
    _csv_headers = []
    _accelerometer_headers = ['Accelerometer X','Accelerometer    Y','Accelerometer Z']
    _gyroscope_headers = ['Gyroscope X','Gyroscope Y','Gyroscope Z']
    _magnetometer_headers = ['Bearing']
    _log_accelerometer = False
    _log_gyroscope= False
    _log_magnetometer = False
    IMU = None
    _writer=[]
    _run_underway = False
    _process=[]
    _stop_value = 0

def __init__(self,output_filename='/home/pi/blah.csv',log_accelerometer = True,log_gyroscope= True,log_magnetometer = True):
    """data logging device
    NOTE! Multiple instances of this class should not use the same IMU devices simultaneously!"""        
    self._output_filename = output_filename
    self._log_accelerometer = log_accelerometer
    self._log_gyroscope = log_gyroscope
    self._log_magnetometer = log_magnetometer

def __del__(self):
    # TODO Update this
    if self._run_underway: # If there's still a run underway, end it first
        self.end_recording()

def _set_up(self):        
    self.IMU = IMU_SEN0(self._log_accelerometer,self._log_gyroscope,self._log_magnetometer)
    self._set_up_headers()

def _set_up_headers(self):
    """Set up the headers of the CSV file based on the header substrings at top and the input flags on what will be measured"""
    self._csv_headers = []
    if self._log_accelerometer is not None:
        self._csv_headers+= self._accelerometer_headers
    if self._log_gyroscope is not None:
        self._csv_headers+= self._gyroscope_headers
    if self._log_magnetometer is not None:
        self._csv_headers+= self._magnetometer_headers


def _record_data(self,frequency,stop_value):
    self._set_up() #Run setup in thread

    """Record data function, which takes a recording frequency, in herz, as an input"""
    previous_read_time=datetime.now()-timedelta(1,0,0)
    self._run_underway = True # Note that a run is now going
    Period = 1/frequency # Period, in seconds, of a recording based on the input frequency
    print("Writing output data to",self._output_filename)

    with open(self._output_filename,'w',newline='') as outcsv:
        self._writer = csv.writer(outcsv)
        self._writer.writerow(self._csv_headers) # Write headers to file

        while stop_value.value==0: # While a run continues
            if datetime.now()-previous_read_time>=timedelta(0,1,0): # If we've waited a period, collect the data; otherwise keep looping
                print("run underway value",self._run_underway)
            if datetime.now()-previous_read_time>=timedelta(0,Period,0): # If we've waited a period, collect the data; otherwise keep looping
                previous_read_time = datetime.now() # Update previous readtime
                next_row = []
                if self._log_accelerometer:
                    # Get values in m/s^2
                    axes = self.IMU.read_accelerometer_values()
                    next_row += [axes['x'],axes['y'],axes['z']]

                if self._log_gyroscope:
                    # Read gyro values
                    gyro = self.IMU.read_gyroscope_values()
                    next_row += [gyro['x'],gyro['y'],gyro['z']]

                if self._log_magnetometer:
                    # Read magnetometer value
                    b= self.IMU.read_magnetometer_bearing()
                    next_row += b

                self._writer.writerow(next_row)

        # Close the csv when done
        outcsv.close()

def start_recording(self,frequency_in_hz):        
    # Create recording process
    self._stop_value = multiprocessing.Value('i',0)
    self._process = multiprocessing.Process(target=self._record_data,args=(frequency_in_hz,self._stop_value))

    # Start recording process
    self._process.start()
    print(datetime.now().strftime("%H:%M:%S.%f"),"Data logging process spawned")
    print("Logging Accelerometer:",self._log_accelerometer)
    print("Logging Gyroscope:",self._log_gyroscope)
    print("Logging Magnetometer:",self._log_magnetometer)     
    print("ID of data logging process: {}".format(self._process.pid))

def end_recording(self,terminate_wait = 2):
    """Function to end the recording multithread that's been spawned.
    Args: terminate_wait: This is the time, in seconds, to wait after attempting to shut down the process before terminating it."""
    # Get process id
    id = self._process.pid

    # Set stop event for process
    self._stop_value.value = 1

    self._process.join(terminate_wait) # Wait two seconds for the process to terminate
    if self._process.is_alive(): # If it's still alive after waiting
        self._process.terminate()
        print(datetime.now().strftime("%H:%M:%S.%f"),"Process",id,"needed to be terminated.")
    else:
        print(datetime.now().strftime("%H:%M:%S.%f"),"Process",id,"successfully ended itself.")

====================================================================

答案:对于这里的任何后续人员来说,问题都在于我使用VS代码调试器,它显然不适用于多处理,并且在某种程度上阻碍了派生进程的成功。感谢下面的Tomasz帮助我解决问题,并最终发现我的愚蠢之处。我们非常感谢你们的帮助!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-12-30 18:15:53

我看不出你的代码有什么问题:

首先,stop_value == 0将不作为multiprocess.Value('i',0) != 0工作,将该行更改为

代码语言:javascript
复制
while stop_value.value == 0

第二,永远不要更新previous_read_time,这样它就会尽可能快地写入读数,很快就会耗尽磁盘。

第三,尝试使用time.sleep() --您正在做的事情叫做繁忙循环--这很糟糕,这是不必要地浪费CPU周期。

第四,以self._stop_value = 1结尾可能行不通--必须有其他方法来设置该值--可能是self._stop_value.value =1。

下面是基于您提供的代码的示例代码,这些代码工作得很好:

代码语言:javascript
复制
import csv
import multiprocessing
import time
from datetime import datetime, timedelta
from random import randint


class IMU(object):

    @staticmethod
    def read_accelerometer_values():
        return dict(x=randint(0, 100), y=randint(0, 100), z=randint(0, 10))


class Foo(object):

    def __init__(self, output_filename):
        self._output_filename = output_filename
        self._csv_headers = ['xxxx','y','z']
        self._log_accelerometer = True
        self.IMU = IMU()

    def _record_data(self, frequency, stop_value):
        #self._set_up()  # Run setup functions for the data collection device and store it in the self.IMU variable

        """Record data function, which takes a recording frequency, in herz, as an input"""
        previous_read_time = datetime.now() - timedelta(1, 0, 0)
        self._run_underway = True  # Note that a run is now going
        Period = 1 / frequency  # Period, in seconds, of a recording based on the input frequency
        print("Writing output data to", self._output_filename)

        with open(self._output_filename, 'w', newline='') as outcsv:
            self._writer = csv.writer(outcsv)
            self._writer.writerow(self._csv_headers)  # Write headers to file

            while stop_value.value == 0:  # While a run continues
                if datetime.now() - previous_read_time >= timedelta(0, 1,
                                                                    0):  # If we've waited a period, collect the data; otherwise keep looping
                    print("run underway value", self._run_underway)
                if datetime.now() - previous_read_time >= timedelta(0, Period,
                                                                    0):  # If we've waited a period, collect the data; otherwise keep looping
                    next_row = []
                    if self._log_accelerometer:
                        # Get values in m/s^2
                        axes = self.IMU.read_accelerometer_values()
                        next_row += [axes['x'], axes['y'], axes['z']]

                    previous_read_time = datetime.now()
                    self._writer.writerow(next_row)

            # Close the csv when done
            outcsv.close()

    def start_recording(self, frequency_in_hz):
        # Create recording process
        self._stop_value = multiprocessing.Value('i', 0)
        self._process = multiprocessing.Process(target=self._record_data, args=(frequency_in_hz, self._stop_value))

        # Start recording process
        self._process.start()
        print(datetime.now().strftime("%H:%M:%S.%f"), "Data logging process spawned")
        print("ID of data logging process: {}".format(self._process.pid))

    def end_recording(self, terminate_wait=2):
        """Function to end the recording multithread that's been spawned.
        Args: terminate_wait: This is the time, in seconds, to wait after attempting to shut down the process before terminating it."""
        # Get process id
        id = self._process.pid

        # Set stop event for process
        self._stop_value.value = 1

        self._process.join(terminate_wait)  # Wait two seconds for the process to terminate
        if self._process.is_alive():  # If it's still alive after waiting
            self._process.terminate()
            print(datetime.now().strftime("%H:%M:%S.%f"), "Process", id, "needed to be terminated.")
        else:
            print(datetime.now().strftime("%H:%M:%S.%f"), "Process", id, "successfully ended itself.")


if __name__ == '__main__':
    foo = Foo('/tmp/foometer.csv')
    foo.start_recording(20)
    time.sleep(5)
    print('Ending recording')
    foo.end_recording()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53979322

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档