代码之家  ›  专栏  ›  技术社区  ›  Jared

如何在本地模式下运行的pyspark中读取S3?

  •  5
  • Jared  · 技术社区  · 6 年前

    我正在使用PyCharm 2018.1,它使用Python 3.4,Spark 2.3通过pip安装在virtualenv中。本地主机上没有hadoop安装,因此没有Spark安装(因此没有Spark\u HOME、hadoop\u HOME等)

    当我尝试时:

    from pyspark import SparkConf
    from pyspark import SparkContext
    conf = SparkConf()\
        .setMaster("local")\
        .setAppName("pyspark-unittests")\
        .set("spark.sql.parquet.compression.codec", "snappy")
    sc = SparkContext(conf = conf)
    inputFile = sparkContext.textFile("s3://somebucket/file.csv")
    

    我得到:

    py4j.protocol.Py4JJavaError: An error occurred while calling o23.partitions.
    : java.io.IOException: No FileSystem for scheme: s3
    

    如果没有完整的Hadoop本地安装,如何在本地模式下运行pyspark时读取s3?

    FWIW-当我在非本地模式的EMR节点上执行它时,这非常有效。

    以下操作不起作用(相同的错误,尽管它确实解决并下载了依赖项):

    import os
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:3.1.0" pyspark-shell'
    from pyspark import SparkConf
    from pyspark import SparkContext
    conf = SparkConf()\
        .setMaster("local")\
        .setAppName("pyspark-unittests")\
        .set("spark.sql.parquet.compression.codec", "snappy")
    sc = SparkContext(conf = conf)
    inputFile = sparkContext.textFile("s3://somebucket/file.csv")
    

    相同(错误)结果:

    import os
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars "/path/to/hadoop-aws-3.1.0.jar" pyspark-shell'
    from pyspark import SparkConf
    from pyspark import SparkContext
    conf = SparkConf()\
        .setMaster("local")\
        .setAppName("pyspark-unittests")\
        .set("spark.sql.parquet.compression.codec", "snappy")
    sc = SparkContext(conf = conf)
    inputFile = sparkContext.textFile("s3://somebucket/file.csv")
    
    3 回复  |  直到 6 年前
        1
  •  9
  •   Tarun Lalwani    6 年前

    所以Glennie的答案很接近,但在你的情况下什么都不起作用。关键是选择依赖项的正确版本。如果您查看虚拟环境

    Jars

    一切都指向一个版本 2.7.3 ,您还需要使用

    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell'
    

    您应该通过检查路径来验证安装使用的版本 venv/Lib/site-packages/pyspark/jars 在项目的虚拟环境中

    之后,您可以使用 s3a 默认情况下或 s3 通过为相同的

    # Only needed if you use s3://
    sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', 'awsKey')
    sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', 'awsSecret')
    s3File = sc.textFile("s3a://myrepo/test.csv")
    
    print(s3File.count())
    print(s3File.id())
    

    输出如下

    OutputSpark

        2
  •  3
  •   Glennie Helles Sindholt    6 年前

    您应该使用 s3a 本地访问S3时的协议。确保将密钥和机密添加到 SparkContext 第一像这样:

    sc = SparkContext(conf = conf)
    sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', 'awsKey')
    sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', 'awsSecret')
    
    inputFile = sparkContext.textFile("s3a://somebucket/file.csv")
    
        3
  •  1
  •   buxizhizhoum    6 年前

    准备工作:

    在spark配置文件中添加以下行,对于我的本地pyspark,它是 /usr/local/spark/conf/spark-default.conf

    spark.hadoop.fs.s3a.access.key=<your access key>
    spark.hadoop.fs.s3a.secret.key=<your secret key>
    

    python文件内容:

    from __future__ import print_function
    import os
    
    from pyspark import SparkConf
    from pyspark import SparkContext
    
    os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
    os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
    
    
    if __name__ == "__main__":
    
        conf = SparkConf().setAppName("read_s3").setMaster("local[2]")
        sc = SparkContext(conf=conf)
    
        my_s3_file3 = sc.textFile("s3a://store-test-1/test-file")
        print("file count:", my_s3_file3.count())
    

    提交:

    spark-submit --master local \
    --packages org.apache.hadoop:hadoop-aws:2.7.3,\
    com.amazonaws:aws-java-sdk:1.7.4,\
    org.apache.hadoop:hadoop-common:2.7.3 \
    <path to the py file above>