当我回到这个问题时,答案比我想象的要简单。你可以放一个
catch
整体上
Map
Catch
陈述
稍微修改我的输入
{
"Comment": "Pipeline to read data from S3 and index into Elasticsearch",
"StartAt": "Map",
"States": {
"Map": {
"Type": "Map",
"ItemsPath": "$.items",
"Parameters": {
...
},
"ResultPath": "$.parallel-output",
"MaxConcurrency": 6,
"Next": "Finalize",
"Iterator": {
"StartAt": "Parallel",
"States": {
"Parallel": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:<>:function:parallel:$LATEST",
"Payload": {
"Input.$": "$"
}
},
"OutputPath": "$.Payload",
"End": true
}
}
},
"Catch": [ {"ErrorEquals": ["States.ALL"], "ResultPath": "$.error-info", "Next": "Cleanup State"}]
},
"Finalize": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:<>:function:finalize:$LATEST",
"Payload": {
"Input.$": "$"
}
},
"End": true
},
"Cleanup State": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:<>:function:cleanup:$LATEST",
"Payload": {
"Input.$": "$"
}
},
"Next": "Fail State"
},
"Fail State": {
"Type": "Fail",
"Error": "ErrorCode",
"Cause": "Caused By Message"
}
}
}
DAG看起来像