我用Mongo连接器编写了一个Java Spark代码。它应该从MongoDB的where列中获取所有这些行
createdDate
大于上一次运行的
创建日期
(就像我在Oracle中存储的每次运行的最高水位线值一样。最初甲骨文中的高水位线值是
1900-01-01 00:00:00.000
).
本栏
创建日期
是
ISODate
输入mongoDB。
在我的MongoDB数据中,为该列存储的最大值
创建日期
是
2018-04-11 01:43:20.165
.
但是
filter
在代码中无法按预期工作,即在第一次运行时,它有时会获取,直到
2018-03-30 21:48:59.519
,然后在第二个或第三个运行它的抓取,直到最大值(
2018年4月11日01:43:20.165
).
理想情况下,当初始高水位线值为
1900-01....
.
代码如下:
package mongo;
import java.net.URI;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.sql.Date;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.bson.Document;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;
import java.sql.Timestamp;
public final class MongoRead
{
private static Connection con=null;
private static String readHighWaterMark(String table, String oraConn, String oraUser, String oraPswd) throws Exception
{
String highWaterMarkValue = "";
try
{
con=DriverManager.getConnection(oraConn,oraUser,oraPswd);
Statement stmt=con.createStatement();
ResultSet rs=stmt.executeQuery("select * from difa.HIGH_WATER_MARK_TABLE where table_nm='"+table+"'");
while(rs.next()){
highWaterMarkValue = rs.getString(3);
}
}
catch(Exception e){
e.printStackTrace();
con.close();
}
return highWaterMarkValue;
}
private static void setHighWaterMark(String key, String value) throws Exception
{
PreparedStatement pStmt=con.prepareStatement("UPDATE high_water_mark_table SET high_water_mark_VALUE='"+value+"' where table_nm='"+key+"'");
int i=pStmt.executeUpdate();
System.out.println(i+" records updated");
}
public static void main(final String[] args) throws Exception {
if(args.length<8){
System.out.println("Please provide correct inputs");
System.exit(1);
}
String mongoAddress = args[0];
String clusterAddress = args[1];
String oraConn = args[2];
String oraUser = args[3];
String oraPswd = args[4];
String tableNm = args[5];
String highWaterCol = args[6];
String loadType = args[7];
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkRecordReader")
.config("spark.mongodb.input.uri", mongoAddress)
.config("spark.mongodb.output.uri", mongoAddress)
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
try{
FileSystem fs = FileSystem.get(new URI(clusterAddress),jsc.hadoopConfiguration());
fs.delete(new Path(clusterAddress),true);
}
catch(Exception e){
e.printStackTrace();
}
/* ********Read data from MongoDB******* */
Dataset<Row> dataset = MongoSpark.load(jsc).toDF();
if(loadType.equalsIgnoreCase("I")){
String highWaterMark = readHighWaterMark(tableNm,oraConn,oraUser,oraPswd);
System.out.println("============HIGH_WATER_MARK_VALUE: "+highWaterMark);
Timestamp oldTime = Timestamp.valueOf(highWaterMark.replace("T"," ").replace("Z", ""));
//Fetches records that where createdDate is greater than previous high Water Mark.
Dataset<Row> filtered = dataset.filter(dataset.col(highWaterCol).$greater(oldTime)).persist();
filtered.toJSON().write().text(clusterAddress);
//Calculating the MAX(createdDate) in the fetched dataset.
Dataset<Row> maxHighWaterRow = filtered.agg(max(filtered.col(highWaterCol)).alias("newHighWater")).persist();
List<Timestamp> newHighWaterValue = maxHighWaterRow.select("newHighWater").as(Encoders.TIMESTAMP()).collectAsList();
Timestamp maxHighWaterMarkValue = newHighWaterValue.iterator().next();
SimpleDateFormat dtFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
Timestamp oldDate = Timestamp.valueOf(highWaterMark.replace('T', ' ').replace("Z",""));
//Setting HIGH_WATER_MARK_VALUE if a greater value is detected.
if(maxHighWaterMarkValue !=null && maxHighWaterMarkValue.after(oldDate)){
setHighWaterMark(tableNm,dtFormat.format(maxHighWaterMarkValue).replace(" ", "T").concat("Z"));
}
}
else{
dataset.toJSON().write().text(clusterAddress);
}
con.close();
jsc.close();
}
}
你知道为什么
滤波器
和
$greater
提取记录不正确吗?