首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >处理多个IO错误

处理多个IO错误
EN

Stack Overflow用户
提问于 2014-01-15 17:07:08
回答 3查看 96关注 0票数 0

我在课堂上执行了几个IO操作,但这些操作在IOError中经常失败。我想要做的是延迟几百毫秒,然后再试一次,直到成功或某些特定的超时。在继续/结束循环之前,如何确保每个单独的命令都成功?我假设有一种更好的方法来检查所有命令是否成功,而不是为每一项设置一个if语句和一个计数器。

下面我的当前代码经常在IOError中失败,并挂起应用程序的其余部分。

代码语言:javascript
复制
   def __init__(self):
      print("Pressure init.")
      self.readCoefficients()

   def readCoefficients(self):
      global a0_MSB;
      global a0_LSB;
      global b1_MSB;
      global b1_LSB;
      global b2_MSB;
      global b2_LSB;
      global c12_MSB;
      global c12_LSB;

      a0_MSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_A0_COEFF_MSB+0);
      a0_LSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_A0_COEFF_LSB+0);

      b1_MSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_B1_COEFF_MSB+0);
      b1_LSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_B1_COEFF_LSB+0);

      b2_MSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_B2_COEFF_MSB+0);
      b2_LSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_B2_COEFF_LSB+0);

      c12_MSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_C12_COEFF_MSB+0);
      c12_LSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_C12_COEFF_LSB+0);
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2014-01-15 17:20:46

如果您可以逐个读取所有文件,则可以使用一个简单的函数。

代码语言:javascript
复制
import time

# ...

def readCoefficients(self):
    global a0_MSB;
    global a0_LSB;
    global b1_MSB;
    global b1_LSB;
    global b2_MSB;
    global b2_LSB;
    global c12_MSB;
    global c12_LSB;

    max_retries = 15

    a0_MSB = self.readretry(Pressure.MPL115A2_REGISTER_A0_COEFF_MSB+0, max_retries)
    a0_LSB = self.readretry(Pressure.MPL115A2_REGISTER_A0_COEFF_LSB+0, max_retries)

    b1_MSB = self.readretry(Pressure.MPL115A2_REGISTER_B1_COEFF_MSB+0, max_retries)
    b1_LSB = self.readretry(Pressure.MPL115A2_REGISTER_B1_COEFF_LSB+0, max_retries)

    b2_MSB = self.readretry(Pressure.MPL115A2_REGISTER_B2_COEFF_MSB+0, max_retries)
    b2_LSB = self.readretry(Pressure.MPL115A2_REGISTER_B2_COEFF_LSB+0, max_retries)

    c12_MSB = self.readretry(Pressure.MPL115A2_REGISTER_C12_COEFF_MSB+0, max_retries)
    c12_LSB = self.readretry(Pressure.MPL115A2_REGISTER_C12_COEFF_LSB+0, max_retries)

    def readretry(self, address, max_retries):
        for i in range(max_retries):
            try:
                return Pressure.bus.read_byte_data(
                    Pressure.MPL115A2_ADDRESS,
                    address
                )
            except IOError as e:
                # print(e)
                time.sleep(0.1)
        else:
            raise IOError("Reading failed after multiple tries")

注意:您不应该使用globals,尤其是在类中。

票数 0
EN

Stack Overflow用户

发布于 2014-01-15 17:12:01

你是想独立地还是集体地重新尝试最后8行中的每一行?如果独立地想要创建一个小助手函数:

代码语言:javascript
复制
def retry_function(tries, function, *args, **kwargs):
    for try in range(tries):
        try:
            return function(*args, **kwargs)
        except IOError as e:
            time.sleep(.005)
    raise e   # will be the last error from inside the loop. be sure tries is at least 1 or this will be undefined!

那就这样说吧:

代码语言:javascript
复制
a0_MSB = retry_function(5, Pressure.bus.read_byte_data, Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_A0_COEFF_MSB+0)

如果不是独立的,而是作为一个组,您可能仍然需要这个助手函数。但是您必须重写它以处理函数/参数的列表,或者传入另一个自定义函数

票数 2
EN

Stack Overflow用户

发布于 2014-01-15 18:04:54

这是另一种方法。此代码尝试读取所有地址,并保存失败地址。然后稍等,然后重试所有失败的地址,直到正确读取所有地址或超出允许重试次数为止。

代码语言:javascript
复制
def readCoefficients(self):
    (
        a0_MSB, a0_LSB,
        b1_MSB, b1_LSB,
        b2_MSB, b2_LSB,
        c12_MSB, c12_LSB) = self.mio_read(15,
            Pressure.MPL115A2_REGISTER_A0_COEFF_MSB+0,
            Pressure.MPL115A2_REGISTER_A0_COEFF_LSB+0,
            Pressure.MPL115A2_REGISTER_B1_COEFF_MSB+0,
            Pressure.MPL115A2_REGISTER_B1_COEFF_LSB+0,
            Pressure.MPL115A2_REGISTER_B2_COEFF_MSB+0,
            Pressure.MPL115A2_REGISTER_B2_COEFF_LSB+0,
            Pressure.MPL115A2_REGISTER_C12_COEFF_MSB+0,
            Pressure.MPL115A2_REGISTER_C12_COEFF_LSB+0
    )

def mio_read(self, max_retries, *addresses):
    # Create storage for results
    results = [None] * len(addresses)
    # Keep track of the index of a particular address in the list of results
    ios = list(enumerate(addresses))
    for i in range(max_retries):
        failedios = []
        for index, address in ios:
            try:
                results[index] = Pressure.bus.read_byte_data(
                    Pressure.MPL115A2_ADDRESS,
                    address
                )
            except IOError as e:
                # Place address in the queue for the next round
                failedios.append((index, address))
        # If all succeeded
        if len(failedios) == 0:
            return results
        # Time may be reduced as so was spent checking other addresses
        time.sleep(0.1)
        ios = failedios
    else:
        raise IOError(",".join((addr for ind, addr in failedios)))
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/21143778

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档