首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用aws-iot- device -sdk-python-v2发布到设备卷影时出现问题

使用aws-iot- device -sdk-python-v2发布到设备卷影时出现问题
EN

Stack Overflow用户
提问于 2021-10-24 11:29:12
回答 1查看 161关注 0票数 0

在使用aws iot device sdk for python v2 (v1.7.1)的python应用程序中,我遇到了一个无法更新设备阴影的问题。

启动程序后,DeviceShadowManager将尝试获取最新的阴影状态并在本地进行设置。如果存在delta状态,则DeviceShadowManager将合并最后的reported状态和delta状态,并发布它。这是可行的。然而,当管理器订阅更新时,在初始设置之后,我遇到了一个错误,当desired状态改变时,管理器不能更新reported状态。下面是错误:

代码语言:javascript
复制
Exception ignored in: <class 'TypeError'>
Traceback (most recent call last):
  File "/Users/tom/.../lib/python3.9/site-packages/awscrt/mqtt.py", line 506, in callback_wrapper
    callback(topic=topic, payload=payload)
TypeError: callback_wrapper() missing 3 required positional arguments: 'dup', 'qos', and 'retain'

我看了看the source,但就是不明白为什么会引发TypeError,特别是因为这种情况似乎是由tryexcept块处理的,还是我完全搞错了?

错误的来源:

代码语言:javascript
复制
 if callback:
    def callback_wrapper(topic, payload, dup, qos, retain):
        try:
            callback(topic=topic, payload=payload, dup=dup, qos=QoS(qos), retain=retain)
        except TypeError:
            # This callback used to have fewer args.
            # Try again, passing only those those args, to cover case where
            # user function failed to take forward-compatibility **kwargs.
            callback(topic=topic, payload=payload) # this is line 506

下面你可以找到我的代码和程序的日志。

这个数据类表示阴影:

代码语言:javascript
复制
from dataclasses import dataclass

@dataclass
class DeviceShadow:
    score_threshold: float = 0.6
    minimum_distance: int = 150

阴影由DeviceShadowManager管理。其中大部分是基于前面提到的repository中的shadow sample

代码语言:javascript
复制
from dataclasses import asdict
from queue import Queue
from threading import Lock

from awscrt import mqtt
from awsiot import iotshadow
from awsiot.iotshadow import IotShadowClient

from app.device_shadow.device_shadow import DeviceShadow, from_json as device_shadow_from_json
from app.models import log

SHADOW_VALUE_DEFAULT = DeviceShadow()


class DeviceShadowManager:
    _shadow_client: IotShadowClient
    shadow_value: DeviceShadow = DeviceShadow()

    _lock = Lock()
    _thing_name: str

    def __init__(self, thing_name: str, mqtt_connection: mqtt.Connection):
        self._thing_name = thing_name
        self._shadow_client = iotshadow.IotShadowClient(mqtt_connection)

        update_accepted_subscribed_future, _ = self._shadow_client.subscribe_to_update_shadow_accepted(
            request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=self._thing_name),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=self.on_update_shadow_accepted  # omitted
        )

        update_rejected_subscribed_future, _ = self._shadow_client.subscribe_to_update_shadow_rejected(
            request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=self._thing_name),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=self.on_update_shadow_rejected  # omitted
        )

        # Wait for subscriptions to succeed
        update_accepted_subscribed_future.result(60)
        update_rejected_subscribed_future.result(60)

        log.info("Subscribing to Get responses...")
        get_accepted_subscribed_future, _ = self._shadow_client.subscribe_to_get_shadow_accepted(
            request=iotshadow.GetShadowSubscriptionRequest(thing_name=self._thing_name),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=self.on_get_shadow_accepted)

        get_rejected_subscribed_future, _ = self._shadow_client.subscribe_to_get_shadow_rejected(
            request=iotshadow.GetShadowSubscriptionRequest(thing_name=self._thing_name),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=self.on_get_shadow_rejected)  # omitted

        # Wait for subscriptions to succeed
        get_accepted_subscribed_future.result()
        get_rejected_subscribed_future.result()

        log.info("Subscribing to Delta events...")
        delta_subscribed_future, _ = self._shadow_client.subscribe_to_shadow_delta_updated_events(
            request=iotshadow.ShadowDeltaUpdatedSubscriptionRequest(
                thing_name=self._thing_name
            ),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=self.on_shadow_delta_updated)

        # Wait for subscription to succeed
        delta_subscribed_future.result()

        # From here on out the rest runs asynchronously.

        # Issue request for shadow's current value.
        # The response will be received by the on_get_accepted() callback
        with self._lock:
            publish_get_future = self._shadow_client.publish_get_shadow(
                request=iotshadow.GetShadowRequest(
                    thing_name=self._thing_name,
                ),
                qos=mqtt.QoS.AT_LEAST_ONCE
            )

        # Ensure that publish succeeds
        publish_get_future.result()

    def on_get_shadow_accepted(self, response: iotshadow.GetShadowResponse) -> None:
        log.info("Finished getting initial shadow value.")

        if response.state and response.state.delta:
            if not response.state.reported:
                response.state.reported = {}
            merged_state = self.merge_states(response.state.delta, response.state.desired)
            return self.set_desired(device_shadow_from_json(merged_state))

        if response.state and response.state.reported:
            return self.set_local(device_shadow_from_json(response.state.reported))

        self.set_desired(SHADOW_VALUE_DEFAULT)
        return

    def on_shadow_delta_updated(self, delta: iotshadow.ShadowDeltaUpdatedEvent) -> None:
        if delta.state:
            if delta.state is None:
                log.info("Delta reports that nothing is set. Setting defaults...")
                self.set_desired(SHADOW_VALUE_DEFAULT)
                return

            log.info("Delta reports that desired shadow is '{}'. Changing local shadow...".format(delta.state))
            self.set_desired(self.merge_states(delta.state, self.shadow_value))

        else:
            log.info("Delta did not report a change")

    @staticmethod
    def merge_states(delta: dict, reported: DeviceShadow):
        for key, value in delta.items():
            reported[key] = value

        return reported

    def set_local(self, value: DeviceShadow) -> None:
        with self._lock:
            self.shadow_value = value

    def set_desired(self, new_value: DeviceShadow) -> None:
        with self._lock:
            if self.shadow_value == new_value:
                log.debug("Local shadow is already '{}'.".format(new_value))
                return

            log.debug("Changing local shadow to '{}'.".format(new_value))
            self.shadow_value = new_value

            log.debug("Updating reported shadow  to '{}'...".format(new_value))
            request = iotshadow.UpdateShadowRequest(
                thing_name=self._thing_name,
                state=iotshadow.ShadowState(
                    desired=asdict(new_value),
                    reported=asdict(new_value),
                ),
            )

            self._shadow_client.publish_update_shadow(request, mqtt.QoS.AT_LEAST_ONCE)

您可以在下面找到日志:

代码语言:javascript
复制
DEBUG:app.mqtt:Connecting to xxxxxxxxxxxxxx-ats.iot.eu-central-1.amazonaws.com with client ID '80d8bc54-971e-0e65-a537-37d14a3cb630'...
INFO:app.models:Subscribing to Get responses...
INFO:app.models:Subscribing to Delta events...
INFO:app.models:Finished getting initial shadow value.
DEBUG:app.models:Changed local shadow to 'DeviceShadow(score_threshold=0.7, minimum_distance=1503)'.
DEBUG:app.models:Updating reported shadow  to 'DeviceShadow(score_threshold=0.7, minimum_distance=1503)'...
INFO:app.models:Update request published.
DEBUG:app.models:Finished updating reported shadow to '{'score_threshold': 0.7, 'minimum_distance': 1503}'.
INFO:app.models:Delta reports that desired shadow is '{'minimum_distance': 15035}'. Changing local shadow...
Exception ignored in: <class 'TypeError'>
Traceback (most recent call last):
  File "/Users/tom/.../lib/python3.9/site-packages/awscrt/mqtt.py", line 506, in callback_wrapper
    callback(topic=topic, payload=payload)
TypeError: callback_wrapper() missing 3 required positional arguments: 'dup', 'qos', and 'retain'
DEBUG:app.models:Finished updating reported shadow to '{'score_threshold': 0.7, 'minimum_distance': 1503}'.

如你所见,堆栈跟踪非常短,有没有更好的调试方法?你知道为什么它会给我这个特殊的错误,以及如何解决它吗?感谢所有的帮助!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-10-25 09:24:01

我很确定问题出在

代码语言:javascript
复制
@staticmethod
def merge_states(delta: dict, reported: DeviceShadow):
   for key, value in delta.items():
      reported[key] = value
   return reported

其中,对reported参数的__setitem__调用会引发一个TypeError,因为报告的参数是一个不支持项分配的DeviceShadow数据类对象。

如果您想设置数据类的字段,其中包含字段名的字符串,则可以使用setattr(reported, key, value)

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69696291

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档