在项目中添加以下文件。
package com.databricks.spark.redshift
import com.databricks.spark.redshift.DefaultSource;
import org.apache.spark.sql.sources.BaseRelation;
import org.apache.spark.sql.{SparkSession, DataFrame, SQLContext}
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.services.s3.AmazonS3Client
object RedshiftReaderM {
val endpoint = "s3.ap-south-1.amazonaws.com"
def getS3Client(provider:AWSCredentialsProvider):AmazonS3Client = {
val client = new AmazonS3Client(provider);
client.setEndpoint(endpoint);
client
}
def getDataFrameForConfig(configs:Map[String,String],
sparkSession:SparkSession):DataFrame = {
val sqlContext = sparkSession.sqlContext
val source:DefaultSource = new DefaultSource(new JDBCWrapper(),getS3Client)
val br:BaseRelation = source.createRelation(sqlContext, configs)
sparkSession.baseRelationToDataFrame(br);
}
}
样本使用。
import com.databricks.spark.redshift._
sc.hadoopConfiguration.set("fs.s3a.endpoint","s3.ap-south-1.amazonaws.com")
System.setProperty("com.amazonaws.services.s3.enableV4", "true")
val options = Map( "query" -> "select * from tbl limit 10",
"url" -> "jdbc:redshift:/<redshift-host>:5439/<database>?user=<user>&password=<password>",
"tempdir" -> "s3a://bucket/ks1/ks2/",
"aws_iam_role" -> "arn:aws:iam::<accountid>:role/<iam-role>"
)
val df = RedshiftReaderM.getDataFrameForConfig(options, spark)
df.show
资料来源:
https://github.com/databricks/spark-redshift/issues/332