首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >更新azure-iot-hub (python)中的所有双胞胎设备

更新azure-iot-hub (python)中的所有双胞胎设备
EN

Stack Overflow用户
提问于 2019-10-31 21:15:59
回答 2查看 625关注 0票数 0

我正在尝试编写一个python程序,该程序仅通过获得集线器的连接字符串来更新给定IoT-hub的所有双胞胎设备。这里给出的代码:Get started with device twins (Python)不是最新的,会崩溃。有没有人在这方面有过成功的经验?

EN

回答 2

Stack Overflow用户

发布于 2019-11-01 13:09:14

除了@silent comment之外,Azure IoT集线器还为Import and Export IoT Hub device identities in bulk.内置了一个特殊的任务

此功能允许更新还包括报告属性的设备双胞胎。批量作业由REST API Create Import Export Job提交给服务。

blob文件中逐行描述了批量作业,下面一行是device1上的updateTwin示例

代码语言:javascript
复制
{ "id":"device1", "importMode":"updateTwin", "status":"enabled", "tags":{},"properties":{"desired":{ "key1":12},"reported":{ "key2":null, "key3":"abcd"} }}

下面的代码片段显示了如何调用此REST服务来更新inputBlobName中描述的所有设备双胞胎

代码语言:javascript
复制
import requests
import json
import time
import urllib
import hmac
import hashlib
import base64


# Example of the blob content for one device
# { "id":"device1", "importMode":"updateTwin", "status":"enabled", "tags":{},"properties":{"desired":{ "key1":12},"reported":{ "key2":null, "key3":"abcd"  } } }


# IoT Hub and Blob Storage
iothub_connection_string = "<IoTHubConnectionString>"
blobContainerUri = "<blobContainerUriSaS>"
inputBlobName = "<inputBlobName>"

def get_sas_token(resource_uri, sas_name, sas_value):  
    sas = base64.b64decode(sas_value.encode('utf-8'))
    expiry = str(int(time.time() + 10000))
    string_to_sign = (resource_uri + '\n' + expiry).encode('utf-8')
    signed_hmac_sha256 = hmac.HMAC(sas, string_to_sign, hashlib.sha256)
    signature = urllib.parse.quote(base64.b64encode(signed_hmac_sha256.digest()))
    return  "SharedAccessSignature sr={}&sig={}&se={}&skn={}".format(resource_uri, signature, expiry, sas_name)

# iothub_sas_token from connection string
cs = dict(map(lambda x: x.split('=',1), iothub_connection_string.split(';')))
iothub_namespace = cs["HostName"].split('.')[0]
sas_token = get_sas_token(cs["HostName"], cs["SharedAccessKeyName"], cs["SharedAccessKey"])

# REST API see doc: https://docs.microsoft.com/en-us/rest/api/iothub/digitaltwinmodel/service/createimportexportjob
uri = "https://{}.azure-devices.net/jobs/create?api-version=2018-06-30".format(iothub_namespace)
headers = { 'Authorization':sas_token, 'Content-Type':'application/json;charset=utf-8' }
payload = { "inputBlobContainerUri": blobContainerUri, "outputBlobContainerUri": blobContainerUri, "inputBlobName": inputBlobName, "type":"import" }
res = requests.post(uri, data = json.dumps(payload), headers = headers)

print(res)

# check if the job has been accepted
if res.status_code != 200: 
    quit()

# check the job status progress 
jobId = json.loads(res.content)["jobId"]
uri = "https://{}.azure-devices.net/jobs/{}?api-version=2018-06-30".format(iothub_namespace, jobId);
while(True): 
    time.sleep(2)    
    res = requests.get(uri, data = None, headers = headers)
    if  res.status_code == 200 and json.loads(res.content)["status"] == "running": 
        print(".", end="", flush=True)
    else: 
        break
print("Done")

可以使用Microsoft Azure Storage Explorer来创建blob容器、生成其sas地址等。

票数 1
EN

Stack Overflow用户

发布于 2019-11-04 16:05:53

通过@Roman Kiss帮助,我使用了create job应用程序接口请求--这不需要使用存储。

代码语言:javascript
复制
JOB_ID = {INSERT_JOB_ID}
CONNECTION_STRING = {INSERT_CONNECTION_STRING}
IOT_HUB_NAME = {INSERT_IOT_HUB_NAME}

def create_device_twin():
    return {"jobId": JOB_ID, "type": "scheduleUpdateTwin", "updateTwin": {"properties": {
        "desired": {INSERT_YOUR_DESIRED_PROERTIES}
    }, "etag": "*"}
     }

def get_sas_token():
    cs = dict(map(lambda x: x.split('=', 1), CONNECTION_STRING.split(';')))
    resource_uri = cs["HostName"]
    sas_name = cs["SharedAccessKeyName"]
    sas_value = cs["SharedAccessKey"]
    sas = base64.b64decode(sas_value.encode('utf-8'))
    expiry = str(int(time.time() + 10000))
    string_to_sign = (resource_uri + '\n' + expiry).encode('utf-8')
    signed_hmac_sha256 = hmac.HMAC(sas, string_to_sign, hashlib.sha256)
    signature = urllib.parse.quote(base64.b64encode(signed_hmac_sha256.digest()))
    return "SharedAccessSignature sr={}&sig={}&se={}&skn={}".format(resource_uri, signature, expiry, sas_name)

if __name__ == '__main__':
    uri = "https://{}.azure-devices.net/jobs/v2/{}?api-version=2018-06-30".format(IOT_HUB_NAME, JOB_ID)
    sas_token = get_sas_token()
    headers = {'Authorization': sas_token, 'Content-Type': 'application/json;charset=utf-8'}
    res = requests.put(uri, headers=headers, data=json.dumps(create_device_twin()))
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58644200

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档