我试图使用状态机语言中定义的并行和Catch块在我的step函数流中添加错误处理。
以下是我的step函数的流程图:

由于我希望为所有step函数设置一个公共错误处理程序,所以我已经将它们封装在一个并行块中,并添加了一个公共Catch块来捕获任何step函数中的任何错误。在查看各种示例和博客时,我遵循了这链接并实现了类似的方法。
我观察到的是,每当任何状态引发异常时,该控件都会进入catch块。catch块的输入是引发的异常,该异常包含JSON对象中的错误和原因。由于我想要错误以及传递给该状态的输入,所以我在catch块中添加了ResultPath作为"$.error"。下面是定义状态机的JSON规范。
{
"StartAt": "Try",
"States": {
"Try": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "Step-1",
"States": {
"Step-1": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:1234:function:step-1-lambda",
"Next": "Step-2"
},
"Step-2": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.some_variable",
"StringEquals": "some_string",
"Next": "Step-3"
},
{
"Variable": "$.some_variable",
"StringEquals": "some_other_string",
"Next": "Step-4"
}
],
"Default": "Step-6"
},
"Step-3": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:1234:function:step-3-lambda",
"Next": "Step-6"
},
"Step-4": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:1234:function:step-4-lambda",
"Next": "Step-6"
},
"Step-6": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:1234:function:step-6-lambda",
"End": true
}
}
}
],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"ResultPath": "$.error",
"Next": "ErrorHandler"
}
],
"Next": "UnwrapOutput"
},
"UnwrapOutput": {
"Type": "Pass",
"InputPath": "$[0]",
"End": true
},
"ErrorHandler": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:1234:function:step-7-lambda",
"End": true
}
}
}例如,假设步骤4生成异常。这种状态的输入是:
{
"foo": "abc",
"bar": "def"
}用来触发状态机的输入是:
{
"boo": "jkl",
"baz": "mno"
}在ErrorHandler中,当步骤4生成一个异常时,我期望ErrorHandler状态的输入是:
{
"foo": "abc",
"bar": "def",
"error": {
"Error": "SomeError",
"Cause": "SomeCause"
}
}但是,接收到的输入由用于触发流的原始输入组成。
{
"boo": "jkl",
"baz": "mno",
"error": {
"Error": "SomeError",
"Cause": "SomeCause"
}
}我需要访问导致ErrorHandler异常的状态的输入字段。它使用"$“提供用于触发流的输入。我有办法做到这一点吗?
任何帮助都将不胜感激,我很久以来一直在努力解决这个问题。
发布于 2021-06-07 14:44:25
我只晚了10个月,没有那么多哈哈,但我希望你已经找到了一个解决方案,在任何情况下,我将分享我的两分钱,这样我可以帮助另一个开发,甚至更好,有人可以告诉我一个更好的方法来做这件事!
首先,让我们看看我们有哪些场景:
我们的目标:访问触发错误的作业。
第一种解决方案-适用于所有方案:
第二个解决方案-应用于Sych作业执行
但为什么这很重要?
第三种解决方案--应用于同步作业执行
如果没有任何抽象,就会出现类似于“使用CDK”的情况。
const job1 = new tasks.LambdaInvoke(scope, 'First Job -- PASS', {
lambdaFunction: function1,
outputPath: '$.Payload'
})
const job2 = new tasks.LambdaInvoke(scope, 'Second Job -- PASS', {
lambdaFunction: function2,
outputPath: '$.Payload'
})
const job3 = new tasks.LambdaInvoke(scope, 'Third Job -- PASS', {
lambdaFunction: function3,
outputPath: '$.Payload'
})
const generateHandleErrorJob = () => new tasks.LambdaInvoke(scope, `Handle Error Job ${Math.random() * 160000000}`, {
lambdaFunction: functionError,
outputPath: '$.Payload'
})
const jobToThrowError = new tasks.LambdaInvoke(scope, 'Job To Throw Error -- PASS', {
lambdaFunction: fucntionThrowError,
outputPath: '$.Payload',
})
const generatePassCheckSetep = (stepName: string) => new sfn.Pass(scope, `Pass: ${stepName}`, {
resultPath: '$.step-info',
result: sfn.Result.fromObject({
step: stepName
})
})
const definition = new sfn.Parallel(scope, 'Parallel Execution -- PASS')
.branch(generatePassCheckSetep('job1').next(job1.addCatch(generateHandleErrorJob(), {resultPath: '$.error-info'})))
.branch(generatePassCheckSetep('jobToThrowError').next(jobToThrowError.addCatch(generateHandleErrorJob(), {resultPath: '$.error-info'})))
.branch(generatePassCheckSetep('job2').next(job2.addCatch(generateHandleErrorJob(), {resultPath: '$.error-info'})))
.next(job3)
new sfn.StateMachine(scope, id, {
definition,
timeout: cdk.Duration.minutes(3)
})但是我还创建了一个抽象的"ParallelStateMachineCatch“,这样您就可以像这样使用:
this.definition = new ParallelStateMachineCatch(this,
}, handleErrorFunction)
.branchCatch(job1)
.branchCatch(job2)
.branchCatch(job3)
.branchCatch(job4)
.branchCatch(job5)
.branchCatch(job6)
.next(final)}
以下是ParallelStateMachineCatch代码:
import { Construct, Duration } from 'monocdk'
import { NodejsFunction } from 'monocdk/aws-lambda-nodejs'
import { Pass,Result, Parallel, ParallelProps } from 'monocdk/aws-stepfunctions'
import { LambdaInvoke } from 'monocdk/aws-stepfunctions-tasks'
export interface DefinitionProps {
sonosEnvironment: string
region: string
accountNumber: string
}
export class ParallelStateMachineCatch extends Parallel {
private errorHandler: NodejsFunction
constructor(scope: Construct, id: string, props: ParallelProps, errorHandler: NodejsFunction) {
super(scope, id, props)
this.errorHandler = errorHandler
}
branchCatch(task: LambdaInvoke): ParallelStateMachineCatch {
const randomId = Math.random().toString().replace('0.', '')
const passInputJob = ParallelStateMachineCatch.generatePassInput(this, task.id, randomId)
const handleErrorJob = ParallelStateMachineCatch.generateHandleErrorJob(this, this.errorHandler, randomId)
const resultPath = '$.error-info'
this.branch(passInputJob.next(task.addCatch(handleErrorJob, { resultPath })))
return this
}
private static generateHandleErrorJob(scope: Construct, errorHandler: NodejsFunction, randomId: string): LambdaInvoke {
return new LambdaInvoke(scope, `Handle Error ${ randomId }`, {
lambdaFunction: errorHandler,
outputPath: '$.Payload',
timeout: Duration.seconds(5),
})
}
private static generatePassInput(scope: Construct, stepName: string, randomId: string): Pass {
return new Pass(scope, `Pass Input ${ randomId }`, {
resultPath: '$.step-info',
result: Result.fromObject({
name: stepName
})
})
}
}不管怎样,我希望我能在这件事上帮助别人,这就是我解决这个问题的方法。请随便教我更好的方法!Tks好运和好代码
https://stackoverflow.com/questions/63303181
复制相似问题