我已经设置了一个Google云存储存储桶来向发布/订阅主题发送通知:
gsutil notification create -t my-topic -f json gs://test-bucket
我已经创建了此主题的订阅,以将消息推送到云函数端点:
gcloud pubsub subscriptions create my-sub --topic my-topic
云函数部署方式为:
gcloud functions deploy promo_received --region europe-west1 --runtime python37 --trigger-topic my-topic
该函数(现在)的目的是检查在测试存储桶中创建的文件是否与特定的文件名匹配,如果匹配,则向Slack发出一条消息。目前,该函数如下所示:
def promo_received(data):
date_str = datetime.today().strftime('%Y%m%d')
filename = json.loads(data)["name"]
bucket = json.loads(data)["bucket"]
if filename == 'PROM_DTLS_{}.txt.gz'.format(date_str):
msg = ":heavy_check_mark: *{}* has been uploaded to *{}*. Awaiting instructions.".format(filename, bucket)
post_to_slack(url, msg)当我通过删除一个名为PROM_DTLS_20190913.txt.gz的文件进行测试时,我可以看到函数被触发,但是它崩溃了,并显示了两个错误:
TypeError: promo_received() takes 1 positional argument but 2 were given
TypeError: the JSON object must be str, bytes or bytearray, not LocalProxy
这是我第一次尝试这样做,我不确定从哪里开始故障排除。任何帮助都将不胜感激!
发布于 2019-09-13 20:12:15
您需要为您的函数添加context as参数,这将解决第一个错误:
def promo_received(data, context):
[...]此外,您不需要使用json.loads来检索文件或存储桶的名称:
data['name']
data['bucket']这应该会消除第二个错误。
请查看Google Cloud Storage Triggers文档页面中的示例
发布于 2019-09-17 05:44:54
要编写Python Cloud函数,请查看this example。请注意Cloud Storage serializes the object into a utf-8 JSON string,它是云函数,然后进行base64编码。因此,您需要首先对有效负载进行base64解码,然后对其进行UTF8解码,然后对其进行JSON解析。
def promo_received(event, context):
obj = json.loads(base64.b64decode(event['data']).decode('utf-8'))
filename = obj[“name”]
bucket = obj[“bucket”]
# the rest of your code goes herehttps://stackoverflow.com/questions/57922832
复制相似问题