我按照下面的Python代码使用CDK创建了一个状态机:
orderbook_upload_state_machine = sfn.StateMachine(
self, "orderbook_upload_state_machine",
definition=tasks.LambdaInvoke(
self, "orderbook_file_splitter_task",
lambda_function=orderbook_file_splitter
).next(
sfn.Map(self, "orderbook_chunk_processor_map").iterator(
tasks.LambdaInvoke(
self, "orderbook_chunk_processor_task",
lambda_function=orderbook_chunk_processor
)
)
)
)在AWS控制台中,我注意到上面的代码生成了以下ASL:
{
"StartAt": "orderbook_file_splitter_task",
"States": {
"orderbook_file_splitter_task": {
"Next": "orderbook_chunk_processor_map",
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:eu-west-1:xxxxxxxxxxxx:function:orderbook_file_splitter",
"Payload.$": "$"
}
},
"orderbook_chunk_processor_map": {
"Type": "Map",
"End": true,
"Iterator": {
"StartAt": "orderbook_chunk_processor_task",
"States": {
"orderbook_chunk_processor_task": {
"End": true,
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:eu-west-1:xxxxxxxxxxxx:function:orderbook_chunk_processor",
"Payload.$": "$"
}
}
}
}
}
}
}但是,调用step函数失败时会出现以下错误:
{
"error": "States.Runtime",
"cause": "Reference path \"$\" must point to array."
}怎么回事?
我注意到,如果我按下面的方式调整控制台中生成的ASL,那么它是成功的。但是,我不知道如何将这些更改转换回CDK代码。
{
"StartAt": "orderbook_file_splitter_task",
"States": {
"orderbook_file_splitter_task": {
"Next": "orderbook_chunk_processor_map",
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:xxxxxxxxxxxx:function:orderbook_file_splitter"
// ⭐⭐ I remove the 'Parameters' block and put the function name here
},
"orderbook_chunk_processor_map": {
"Type": "Map",
"End": true,
"Iterator": {
"StartAt": "orderbook_chunk_processor_task",
"States": {
"orderbook_chunk_processor_task": {
"End": true,
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:eu-west-1:xxxxxxxxxxxx:function:orderbook_chunk_processor",
"Payload.$": "$"
}
}
}
}
}
}
}发布于 2022-03-24 16:59:56
我通过遵循与本教程相关的排序找到了解决方案,并将payload_response_only属性添加到第一个lambda任务定义中。
orderbook_upload_state_machine = sfn.StateMachine(
self, "orderbook_upload_state_machine",
definition=tasks.LambdaInvoke(
self, "orderbook_file_splitter_task",
lambda_function=orderbook_file_splitter,
payload_response_only=True
).next(
sfn.Map(self, "orderbook_chunk_processor_map").iterator(
tasks.LambdaInvoke(
self, "orderbook_chunk_processor_task",
lambda_function=orderbook_chunk_processor
)
)
)
)生成的ASL现在如下所示:
{
"StartAt": "orderbook_file_splitter_task",
"States": {
"orderbook_file_splitter_task": {
"Next": "orderbook_chunk_processor_map",
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:xxxxxxxxxxxx:function:adam-hyperchain-orderbook_file_splitter"
},
"orderbook_chunk_processor_map": {
"Type": "Map",
"End": true,
"Iterator": {
"StartAt": "orderbook_chunk_processor_task",
"States": {
"orderbook_chunk_processor_task": {
"End": true,
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:eu-west-1:xxxxxxxxxxxx:function:adam-hyperchain-orderbook_chunk_processor",
"Payload.$": "$"
}
}
}
}
}
}
}编辑:此更改(output_path)还解决了这个问题:
orderbook_upload_state_machine = sfn.StateMachine(
self, "orderbook_upload_state_machine",
definition=tasks.LambdaInvoke(
self, "orderbook_file_splitter_task",
lambda_function=orderbook_file_splitter,
output_path="$.Payload"
).next(
sfn.Map(self, "orderbook_chunk_processor_map").iterator(
tasks.LambdaInvoke(
self, "orderbook_chunk_processor_task",
lambda_function=orderbook_chunk_processor
)
)
)
)https://stackoverflow.com/questions/71606093
复制相似问题