代码之家  ›  专栏  ›  技术社区  ›  fuggy_yama

如何使用Boto3在AWS EMR集群中等待步骤完成

  •  6
  • fuggy_yama  · 技术社区  · 7 年前

    EMR Waiters

    4 回复  |  直到 7 年前
        1
  •  10
  •   Rich Smith ely    6 年前

    现在有一个服务员可以为步骤完成活动服务。它是在最近的boto3版本中添加的。

    http://boto3.readthedocs.io/en/latest/reference/services/emr.html#EMR.Waiter.StepComplete

    示例代码:

    import boto3
    
    client = boto3.client("emr")
    waiter = client.get_waiter("step_complete")
    waiter.wait(
        ClusterId='the-cluster-id',
        StepId='the-step-id',
        WaiterConfig={
            "Delay": 30,
            "MaxAttempts": 10
        }
    )
    
        2
  •  4
  •   helloV    7 年前

    Boto3中没有内置函数。但你可以自己编写服务员。

    describe_step

    呼叫 describe_step 具有 cluster_id step_id

    'State': 'PENDING'|'CANCEL_PENDING'|'RUNNING'|'COMPLETED'|'CANCELLED'|'FAILED'|'INTERRUPTED'
    
        3
  •  4
  •   fuggy_yama    7 年前

    我想出了以下代码(如果您设置 max_attempts 设置为0或更小,则只需等待,直到没有运行/挂起的步骤):

    def wait_for_steps_completion(emr_client, emr_cluster_id, max_attempts=0):
        sleep_seconds = 30
        num_attempts = 0
    
        while True:
            response = emr_client.list_steps(
                ClusterId=emr_cluster_id,
                StepStates=['PENDING', 'CANCEL_PENDING', 'RUNNING']
            )
            num_attempts += 1
            active_aws_emr_steps = response['Steps']
    
            if active_aws_emr_steps:
                if 0 < max_attempts <= num_attempts:
                    raise Exception(
                        'Max attempts exceeded while waiting for AWS EMR steps completion. Last response:\n'
                        + json.dumps(response, indent=3, default=str)
                    )
                time.sleep(sleep_seconds)
            else:
                return
    
        4
  •  0
  •   Laren Crawford    4 年前

    GitHub .

    status_poller函数循环并调用函数,打印“.”或新状态,直到返回指定状态:

    def status_poller(intro, done_status, func):
        """
        Polls a function for status, sleeping for 10 seconds between each query,
        until the specified status is returned.
        :param intro: An introductory sentence that informs the reader what we're
                      waiting for.
        :param done_status: The status we're waiting for. This function polls the status
                            function until it returns the specified status.
        :param func: The function to poll for status. This function must eventually
                     return the expected done_status or polling will continue indefinitely.
        """
        status = None
        print(intro)
        print("Current status: ", end='')
        while status != done_status:
            prev_status = status
            status = func()
            if prev_status == status:
                print('.', end='')
            else:
                print(status, end='')
            sys.stdout.flush()
            time.sleep(10)
        print()
    

    status_poller(
        "Waiting for step to complete...",
        'COMPLETED',
        lambda:
        emr_basics.describe_step(cluster_id, step_id, emr_client)['Status']['State'])