代码之家  ›  专栏  ›  技术社区  ›  dks551

尝试使用boto3将numpy数组上载到s3时发生Pyspark pickling错误

  •  -2
  • dks551  · 技术社区  · 6 年前

    我试图在pyspark应用程序中使用boto3客户机将我的numpy数组上传到s3,但是它给了我pickling错误消息。下面是我的代码。

    def write_features3(model,key,obj,output_path, format_name):
        try:
            LOGGER.info('executing vgg16 feature extractor...')
            img = image.load_img(BytesIO(obj), target_size=(224, 224,3))
            img_data = image.img_to_array(img)
            img_data = np.expand_dims(img_data, axis=0)
            img_data = preprocess_input(img_data)
            vgg16_feature = model.predict(img_data)[0]
            LOGGER.info('++++++++++++++++++++++++++++',vgg16_feature.shape)
    
    
    
    
    
    
            file_name_without_ext = get_file_name_without_ext(key)
            rest_of_path = OUTPUT.split('/', 1)[1]
            s3_full_path = rest_of_path + '/' + file_name_without_ext + '.' + '.npy'
            LOGGER.info("Saving to S3....")
            feature_dir = '/home/hadoop'
            s3 = boto3.client('s3', region_name='us-east-1')
            local_dir_full_path = feature_dir + '/' + file_name_without_ext + '.npy'
            np.save(local_dir_full_path, vgg16_feature)
            s3.upload_file(local_dir_full_path, 'test', s3_full_path)
            os.remove(local_dir_full_path)
        except Exception as e:
            print('Error......{}'.format(e.args))
            return []
    
    def write_features_(xs):
        model_data = initVGG16()
    
        for k, v in xs:
            yield k, write_features3(model_data, k,v,OUTPUT, FORMAT_NAME)
    
    driver program:-
    s3_files_rdd = sc.binaryFiles('s3n://....')
    features_rdd = s3_files_rdd.foreachPartition(write_features_)
    

    当我尝试这个程序,我得到以下错误。甚至我也尝试将s3客户机放在write\u features\u partition方法中,但没有成功。同样的错误。 spark版本-2.2.1

    错误:-

    n save_reduce
        save(state)
      File "/usr/lib64/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
        self._batch_setitems(obj.iteritems())
      File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
        save(v)
      File "/usr/lib64/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
        self._batch_appends(iter(obj))
      File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
        save(tmp[0])
      File "/usr/lib64/python2.7/pickle.py", line 331, in save
        self.save_reduce(obj=obj, *rv)
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
        save(state)
      File "/usr/lib64/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
        self._batch_setitems(obj.iteritems())
      File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
        save(v)
      File "/usr/lib64/python2.7/pickle.py", line 331, in save
        self.save_reduce(obj=obj, *rv)
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
        save(state)
      File "/usr/lib64/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
        self._batch_setitems(obj.iteritems())
      File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
        save(v)
      File "/usr/lib64/python2.7/pickle.py", line 306, in save
        rv = reduce(self.proto)
    TypeError: can't pickle thread.lock objects
    Traceback (most recent call last):
      File "six_file_boto3_write1.py", line 249, in <module>
        run()
      File "six_file_boto3_write1.py", line 227, in run
        s3_files_rdd.foreachPartition(write_features_)
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 799, in foreachPartition
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 1041, in count
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 1032, in sum
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 906, in fold
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 809, in collect
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
    pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects
    
    1 回复  |  直到 6 年前
        1
  •  0
  •   dks551    6 年前

    问题在于spark版本,我使用的是spark-2.2.1。现在我升级到spark-2.3.2,一切都很顺利。