代码之家  ›  专栏  ›  技术社区  ›  Sundar N

连接到S3的Sparkyr bucket抛出错误

  •  3
  • Sundar N  · 技术社区  · 7 年前

    我正在尝试从R Sparkyr连接到S3 Bucket。 我能够将本地文件读入spark上下文。 抛出大量错误。

    注意:单个s3存储桶有多个csv文件 遵循相同的模式。

    library( sparklyr )
    library( tidyverse )
    
    sparklyr :: spark_install ( version = "2.0.2" , hadoop_version = "2.7" ) 
    sparklyr::spark_install( version = "2.0.2" , hadoop_version = "2.7" ) 
    Sys.setenv ( AWS_ACCESS_KEY_ID = "xxxx" )
    Sys.setenv ( AWS_SECRET_ACCESS_KEY = "xxxx" )
    Sys.setenv ( AWS_DEFAULT_REGION = "ap-southeast-1" )
    
    Spark_config <- sparklyr :: spark_config ()
    sc <- sparklyr :: spark_connect ( master = "local" ,config = Spark_config)
    files = "s3n://temp-sg/MVC"
    temp<-spark_read_csv(sc,name = "MVC",path=files,infer_schema = TRUE)
    spark_disconnect(sc)
    

    非常感谢您的帮助。

    以下是使用s3a时的错误转储://

    Error: java.lang.IllegalArgumentException: java.net.URISyntaxException: Expected scheme-specific part at index 4: s3a:
        at org.apache.hadoop.fs.Path.initialize(Path.java:206)
        at org.apache.hadoop.fs.Path.<init>(Path.java:172)
        at org.apache.hadoop.fs.Path.<init>(Path.java:94)
        at org.apache.hadoop.fs.Globber.glob(Globber.java:211)
        at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644)
        at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:257)
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1307)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1302)
        at com.databricks.spark.csv.CsvRelation.firstLine$lzycompute(CsvRelation.scala:249)
        at com.databricks.spark.csv.CsvRelation.firstLine(CsvRelation.scala:245)
        at com.databricks.spark.csv.CsvRelation.inferSchema(CsvRelation.scala:223)
        at com.databricks.spark.csv.CsvRelation.<init>(CsvRelation.scala:72)
        at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:157)
        at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:44)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:109)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at sparklyr.Invoke$.invoke(invoke.scala:94)
        at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89)
        at sparklyr.StreamHandler$.read(stream.scala:55)
        at sparklyr.BackendHandler.channelRead0(handler.scala:49)
        at sparklyr.BackendHandler.channelRead0(handler.scala:14)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Unknown Source)
    Caused by: java.net.URISyntaxException: Expected scheme-specific part at index 4: s3a:
        at java.net.URI$Parser.fail(Unknown Source)
        at java.net.URI$Parser.failExpecting(Unknown Source)
        at java.net.URI$Parser.parse(Unknown Source)
        at java.net.URI.<init>(Unknown Source)
        at org.apache.hadoop.fs.Path.initialize(Path.java:203)
        ... 58 more
    

    Error: java.lang.IllegalArgumentException: java.net.URISyntaxException: Expected scheme-specific part at index 4: s3n:
            at org.apache.hadoop.fs.Path.initialize(Path.java:206)
            at org.apache.hadoop.fs.Path.<init>(Path.java:172)
            at org.apache.hadoop.fs.Path.<init>(Path.java:94)
            at org.apache.hadoop.fs.Globber.glob(Globber.java:211)
            at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644)
            at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:257)
            at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
            at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
            at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
            at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
            at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
            at scala.Option.getOrElse(Option.scala:120)
            at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
            at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
            at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
            at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
            at scala.Option.getOrElse(Option.scala:120)
            at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
            at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1307)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
            at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
            at org.apache.spark.rdd.RDD.take(RDD.scala:1302)
            at com.databricks.spark.csv.CsvRelation.firstLine$lzycompute(CsvRelation.scala:249)
            at com.databricks.spark.csv.CsvRelation.firstLine(CsvRelation.scala:245)
            at com.databricks.spark.csv.CsvRelation.inferSchema(CsvRelation.scala:223)
            at com.databricks.spark.csv.CsvRelation.<init>(CsvRelation.scala:72)
            at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:157)
            at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:44)
            at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
            at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
            at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:109)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
            at java.lang.reflect.Method.invoke(Unknown Source)
            at sparklyr.Invoke$.invoke(invoke.scala:94)
            at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89)
            at sparklyr.StreamHandler$.read(stream.scala:55)
            at sparklyr.BackendHandler.channelRead0(handler.scala:49)
            at sparklyr.BackendHandler.channelRead0(handler.scala:14)
            at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
            at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
            at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
            at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
            at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
            at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
            at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
            at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
            at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
            at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
            at java.lang.Thread.run(Unknown Source)
        Caused by: java.net.URISyntaxException: Expected scheme-specific part at index 4: s3n:
            at java.net.URI$Parser.fail(Unknown Source)
            at java.net.URI$Parser.failExpecting(Unknown Source)
            at java.net.URI$Parser.parse(Unknown Source)
            at java.net.URI.<init>(Unknown Source)
            at org.apache.hadoop.fs.Path.initialize(Path.java:203)
    
    
    ... 58 more
    
    2 回复  |  直到 7 年前
        1
  •  1
  •   Sundar N    7 年前

    解决问题。 注: -spark版本-2.0 -hadoop版本-2.7

    # install.packages("devtools")
    # devtools::install_github("rstudio/sparklyr") 
    
    library(sparklyr)
    library(dplyr)
    
    # conf$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3"
    # config$spark.executor.memory <- "4g"
    sc <- spark_connect(master = "local",config = conf)
    
    #Get spark context  
    ctx <- sparklyr::spark_context(sc)
    
    #Use below to set the java spark context
    jsc <- invoke_static(  
      sc,
      "org.apache.spark.api.java.JavaSparkContext",
      "fromSparkContext",
      ctx
    )
    #set the s3 configs:  
    hconf <- jsc %>% invoke("hadoopConfiguration")  
    hconf %>% invoke("set","fs.s3a.access.key", "xxxx")  
    hconf %>% invoke("set","fs.s3a.secret.key", "xxxx")  
    
    # check if spar session is active
    sparklyr::spark_connection_is_open(sc=sc)
    
    
    small_file = "s3a://temp-sg/MVC"
    
    temp<-spark_read_csv(sc,name = "MVC",path=small_file,infer_schema = TRUE)
    spark_disconnect(sc)
    
        2
  •  0
  •   JanLauGe    7 年前

    如果没有看到您的确切错误消息,很难说到底出了什么问题。然而,我注意到的一件事是,您正在使用 s3n 结束 s3a s3a 相反:

    files <- 's3a://temp-sg/MVC'
    temp <- spark_read_csv(sc, 
      name = 'MVC', 
      path = files,
      infer_schema = TRUE)
    

    另请参见 this post