代码之家  ›  专栏  ›  技术社区  ›  sedavidw

StepFunction映射失败的步骤上的Fork/Join

  •  0
  • sedavidw  · 技术社区  · 5 年前

    我有一个step函数,它从许多并行步骤(每个并行步骤都是lambda调用)开始,然后是完成一些最终处理的finalize步骤。

    理想情况下,我希望的是,如果任何并行步骤由于任何原因失败,那么所有当前步骤(以及将来的步骤)都将被取消,它们永远不会进入Finalize阶段,而是进入第三个状态(称为错误恢复)以执行不同的执行。这个工作流程可行吗?如果是这样,是否可以保证在进入恢复状态之前所有并行步骤都已停止?

    Step Function DAG

    阶跃函数定义

    {
      "Comment": "An example of the Amazon States Language using a map state to process elements of an array with a max concurrency of 2.",
      "StartAt": "Map",
      "States": {
        "Map": {
          "Type": "Map",
          "ItemsPath": "$.items",
          "Parameters": {
            ...
          },
          "MaxConcurrency": 2,
          "Next": "Finalize",
          "Iterator": {
            "StartAt": "Parallel Step",
            "States": {
              "Parallel Step": {
                "Type": "Task",
                "Resource": "arn:aws:states:::lambda:invoke",
                "Parameters": {
                  "FunctionName": "arn:aws:lambda:us-east-1:<>:function:lambda-parallel-step:$LATEST",
                  "Payload": {
                    "Input.$": "$"
                  }
                },
                "OutputPath": "$.Payload",
                "End": true
              }
            }
          }
        },
        "Finalize": {
          "Type": "Pass",
          "End": true
        }}}
    
    0 回复  |  直到 5 年前
        1
  •  4
  •   sedavidw    5 年前

    当我回到这个问题时,答案比我想象的要简单。你可以放一个 catch 整体上 Map Catch 陈述

    稍微修改我的输入

    {
      "Comment": "Pipeline to read data from S3 and index into Elasticsearch",
      "StartAt": "Map",
      "States": {
        "Map": {
          "Type": "Map",
          "ItemsPath": "$.items",
          "Parameters": {
            ...
          },
          "ResultPath": "$.parallel-output",
          "MaxConcurrency": 6,
          "Next": "Finalize",
          "Iterator": {
            "StartAt": "Parallel",
            "States": {
              "Parallel": {
                "Type": "Task",
                "Resource": "arn:aws:states:::lambda:invoke",
                "Parameters": {
                  "FunctionName": "arn:aws:lambda:us-east-1:<>:function:parallel:$LATEST",
                  "Payload": {
                    "Input.$": "$"
                  }
                },
                "OutputPath": "$.Payload",
                "End": true
              }
            }
          },
          "Catch": [ {"ErrorEquals": ["States.ALL"], "ResultPath": "$.error-info", "Next": "Cleanup State"}]
        },
        "Finalize": {
          "Type": "Task",
          "Resource": "arn:aws:states:::lambda:invoke",
          "Parameters": {
            "FunctionName": "arn:aws:lambda:us-east-1:<>:function:finalize:$LATEST",
            "Payload": {
              "Input.$": "$"
            }
          },
          "End": true
        },
        "Cleanup State": {
          "Type": "Task",
          "Resource": "arn:aws:states:::lambda:invoke",
          "Parameters": {
            "FunctionName": "arn:aws:lambda:us-east-1:<>:function:cleanup:$LATEST",
            "Payload": {
              "Input.$": "$"
            }
          },
          "Next": "Fail State"
        },
        "Fail State": {
          "Type": "Fail",
          "Error": "ErrorCode",
          "Cause": "Caused By Message"
        }
      }
    }
    

    DAG看起来像

    enter image description here