我需要帮助将论点(conf params)传递给mwaa (气流)从lambda。Lmabda用于触发sqs事件上的守护进程。
在没有命令行参数的情况下,dag运行良好。
import boto3
import http.client
import base64
import ast
mwaa_env_name = 'dflow_dev_2'
dag_name = 'tpda_test'
mwaa_cli_command = 'dags trigger'
client = boto3.client('mwaa')
def lambda_handler(event, context):
# get web token
mwaa_cli_token = client.create_cli_token(
Name=mwaa_env_name
)
conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname'])
payload = "dags trigger " + dag_name + "--conf '{'name':'v111'}' "
headers = {
'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'],
'Content-Type': 'text/plain'
}
conn.request("POST", "/aws_mwaa/cli", payload, headers)
res = conn.getresponse()
data = res.read()
dict_str = data.decode("UTF-8")
mydata = ast.literal_eval(dict_str)
return base64.b64decode(mydata['stdout'])发布于 2022-04-27 16:00:24
payload可能有问题。下面的代码示例在DAG和--conf之间添加一个空格,并将JSON字符串中的引号更改为双引号。
conf = "{\"" + "name" + "\":\"" + "v111" + "\"}"
payload = "dags trigger " + dag_name + " --conf '{}'".format(conf)注意payload前后的不同值。
Before:
dags trigger tpda_test--conf '{'name':'v111'}'
After:
dags trigger tpda_test --conf '{"name":"v111"}'参考资料:在触发DAG (AWS)时添加配置
https://stackoverflow.com/questions/72029516
复制相似问题