我使用Apache Spark的Java API从一些源(如Cassandra和HDFS)加载数据,以便在Apache Zeppelin中可视化。代码正在使用
SparkSql
和
SparkSession
不
SparkContext
并在IDE中成功测试。我将完整的代码(包含我在IDE中运行的所有代码)粘贴到Zeppelin笔记本中,并在运行完一个错误后将其粘贴到Zeppelin笔记本中
error: ';' expected but 'class' found
.我认为这个错误是Scala错误,我怀疑齐柏林飞艇是否支持Spark Java API?!如果是,我如何解决问题?
Apache齐柏林飞艇版本:
0.7.3
代码如下:
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.HashMap;
public class LoadCSV {
public static void main(String[] args) {
HashMap<String, String> ops = new HashMap();
ops.put("table", "grades");
ops.put("keyspace", "zeppline");
SparkSession spark = SparkSession
.builder()
.master("local[*]")
.appName("Java Spark SQL basic example")
.config("spark.cassandra.connection.host", "127.0.0.1")
.config("spark.cassandra.connection.port", "9042")
.getOrCreate();
Dataset<Row> df1 = spark.read()
.format("csv")
.option("header", true)
.option("inferSchema", true)
.csv("hdfs://localhost:54310/Data/Zeppline/grades.csv");
Dataset<Row> df2 = spark.read()
.format("org.apache.spark.sql.cassandra")
.options(ops)
.load()
.select("id","first_name", "last_name", "ssn", "test1", "test2", "test3", "test4", "final", "grade");
df1.union(df2).createOrReplaceTempView("grades");
}
}