在调查一个错误时,我注意到PySpark rdd操作中存在一个潜在的错误。在下面的PySpark代码中(
https://github.com/apache/spark/blob/27455aee8e6c671dcfee757771be6cdd58c9b001/python/pyspark/rdd.py#L1119
)
def pipe(
self, command: str, env: Optional[Dict[str, str]] = None, checkCode: bool = False
) -> "RDD[str]":
# ...
if env is None:
env = dict()
def func(iterator: Iterable[T]) -> Iterable[str]:
pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
def pipe_objs(out: IO[bytes]) -> None:
for obj in iterator:
s = str(obj).rstrip("\n") + "\n"
out.write(s.encode("utf-8"))
out.close()
Thread(target=pipe_objs, args=[pipe.stdin]).start()
单独的线程通过迭代器循环,并将输出发布到定义的管道。然而,如果在迭代过程中遇到异常,异常将被抛出线程(第行)
for obj in iterator:
)而且没有被主线抓住。
此外,它还会阻止管道关闭(管线)
out.close()
)! 整个输出管将挂起,等待超时。
关于我是如何得出这个假设的一些回溯
我在PySpark 2.4的一个Spark作业中遇到了以下异常,我正在执行rdd。过滤器(lambda行:len(行.语句)<最小长度)
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/mnt/spark/app/apache-spark/python/lib/pyspark.zip/pyspark/rdd.py", line 758, in pipe_obis
File "/mnt/spark/app/apache-spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "/mnt/app/pyspark-app/app.zip/my_pyspark_task.py", line 43, in <lambda>
len(row.sentence) < min_length
TypeError: object of type 'NoneType' has no len()
然而,这个异常并没有被驱动程序捕获,或者导致整个Spark任务失败。相反,整个任务仍然悬而未决!
这对你们大家有意义吗?现在我知道我的数据存在一些问题,但如果不失败就挂起来,会让人们混淆它是失败了还是只是运行了很长时间。如果它是一个bug,我们可能应该修复它。