代码之家  ›  专栏  ›  技术社区  ›  SCAuFish

PySpark-挂起rdd操作(管道、过滤器等)

  •  0
  • SCAuFish  · 技术社区  · 2 年前

    在调查一个错误时,我注意到PySpark rdd操作中存在一个潜在的错误。在下面的PySpark代码中( https://github.com/apache/spark/blob/27455aee8e6c671dcfee757771be6cdd58c9b001/python/pyspark/rdd.py#L1119 )

    def pipe(
        self, command: str, env: Optional[Dict[str, str]] = None, checkCode: bool = False
    ) -> "RDD[str]":
        # ...
        if env is None:
            env = dict()
    
        def func(iterator: Iterable[T]) -> Iterable[str]:
            pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
    
            def pipe_objs(out: IO[bytes]) -> None:
                for obj in iterator:
                    s = str(obj).rstrip("\n") + "\n"
                    out.write(s.encode("utf-8"))
                out.close()
    
            Thread(target=pipe_objs, args=[pipe.stdin]).start()
    

    单独的线程通过迭代器循环,并将输出发布到定义的管道。然而,如果在迭代过程中遇到异常,异常将被抛出线程(第行) for obj in iterator: )而且没有被主线抓住。 此外,它还会阻止管道关闭(管线) out.close() )! 整个输出管将挂起,等待超时。

    关于我是如何得出这个假设的一些回溯

    我在PySpark 2.4的一个Spark作业中遇到了以下异常,我正在执行rdd。过滤器(lambda行:len(行.语句)<最小长度)

    Exception in thread Thread-2:
    Traceback (most recent call last):
      File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
        self.run()
      File "/usr/lib/python3.7/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
      File "/mnt/spark/app/apache-spark/python/lib/pyspark.zip/pyspark/rdd.py", line 758, in pipe_obis
      File "/mnt/spark/app/apache-spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
        return f(*args, **kwargs)
      File "/mnt/app/pyspark-app/app.zip/my_pyspark_task.py", line 43, in <lambda>
        len(row.sentence) < min_length
    TypeError: object of type 'NoneType' has no len()
    

    然而,这个异常并没有被驱动程序捕获,或者导致整个Spark任务失败。相反,整个任务仍然悬而未决!

    这对你们大家有意义吗?现在我知道我的数据存在一些问题,但如果不失败就挂起来,会让人们混淆它是失败了还是只是运行了很长时间。如果它是一个bug,我们可能应该修复它。

    0 回复  |  直到 2 年前