代码之家  ›  专栏  ›  技术社区  ›  aiman

Java Spark Mongo:filter(data set.col(newTime).$greater(oldTime))未在完整数据集上运行

  •  0
  • aiman  · 技术社区  · 6 年前

    我用Mongo连接器编写了一个Java Spark代码。它应该从MongoDB的where列中获取所有这些行 createdDate 大于上一次运行的 创建日期 (就像我在Oracle中存储的每次运行的最高水位线值一样。最初甲骨文中的高水位线值是 1900-01-01 00:00:00.000 ).
    本栏 创建日期 ISODate 输入mongoDB。

    在我的MongoDB数据中,为该列存储的最大值 创建日期 2018-04-11 01:43:20.165 .
    但是 filter 在代码中无法按预期工作,即在第一次运行时,它有时会获取,直到 2018-03-30 21:48:59.519 ,然后在第二个或第三个运行它的抓取,直到最大值( 2018年4月11日01:43:20.165 ).
    理想情况下,当初始高水位线值为 1900-01.... .
    代码如下:

    package mongo;
    
    import java.net.URI;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.Statement;
    import java.text.ParsePosition;
    import java.text.SimpleDateFormat;
    import java.sql.Date;
    import java.util.Iterator;
    import java.util.List;
    
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.SparkSession;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.bson.Document;
    import static org.apache.spark.sql.functions.*;
    import org.apache.spark.sql.DataFrameWriter;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Encoders;
    import org.apache.spark.sql.Row;
    import com.mongodb.spark.MongoSpark;
    import com.mongodb.spark.rdd.api.java.JavaMongoRDD;
    import java.sql.Timestamp;
    
    public final class MongoRead
    {
        private static Connection con=null;
        private static String readHighWaterMark(String table, String oraConn, String oraUser, String oraPswd) throws Exception
        {
            String highWaterMarkValue = "";
            try
            {       
                con=DriverManager.getConnection(oraConn,oraUser,oraPswd);
                Statement stmt=con.createStatement();
                ResultSet rs=stmt.executeQuery("select * from difa.HIGH_WATER_MARK_TABLE where table_nm='"+table+"'");
                while(rs.next()){
                    highWaterMarkValue = rs.getString(3);
                }
            }
            catch(Exception e){
                e.printStackTrace();
                con.close();
            }
    
            return highWaterMarkValue;
        }
    
        private static void setHighWaterMark(String key, String value) throws Exception
        {
            PreparedStatement pStmt=con.prepareStatement("UPDATE high_water_mark_table SET high_water_mark_VALUE='"+value+"' where table_nm='"+key+"'");
            int i=pStmt.executeUpdate();
            System.out.println(i+" records updated");
    
        }
    
        public static void main(final String[] args) throws Exception {
            if(args.length<8){
                System.out.println("Please provide correct inputs");
                System.exit(1);
            }
            String mongoAddress = args[0];
            String clusterAddress = args[1];
            String oraConn = args[2];
            String oraUser = args[3];
            String oraPswd = args[4];
            String tableNm = args[5];
            String highWaterCol = args[6];
            String loadType = args[7];
    
    
            SparkSession spark = SparkSession.builder()
                .master("local")
                .appName("MongoSparkRecordReader")
                .config("spark.mongodb.input.uri", mongoAddress)
                .config("spark.mongodb.output.uri", mongoAddress)
                .getOrCreate();
    
            JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
    
            try{
                FileSystem fs = FileSystem.get(new URI(clusterAddress),jsc.hadoopConfiguration());
                fs.delete(new Path(clusterAddress),true);
            }
            catch(Exception e){
                e.printStackTrace();
            }
    
    
            /* ********Read data from MongoDB******* */
            Dataset<Row> dataset = MongoSpark.load(jsc).toDF();
    
            if(loadType.equalsIgnoreCase("I")){
                String highWaterMark = readHighWaterMark(tableNm,oraConn,oraUser,oraPswd);
                System.out.println("============HIGH_WATER_MARK_VALUE: "+highWaterMark);
    
                Timestamp oldTime = Timestamp.valueOf(highWaterMark.replace("T"," ").replace("Z", ""));
                //Fetches records that where createdDate is greater than previous high Water Mark.
                Dataset<Row> filtered = dataset.filter(dataset.col(highWaterCol).$greater(oldTime)).persist();
                filtered.toJSON().write().text(clusterAddress);
    
                //Calculating the MAX(createdDate) in the fetched dataset.
                Dataset<Row> maxHighWaterRow = filtered.agg(max(filtered.col(highWaterCol)).alias("newHighWater")).persist();           
                List<Timestamp> newHighWaterValue = maxHighWaterRow.select("newHighWater").as(Encoders.TIMESTAMP()).collectAsList();
                Timestamp maxHighWaterMarkValue = newHighWaterValue.iterator().next();              
    
    
                SimpleDateFormat dtFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                Timestamp oldDate = Timestamp.valueOf(highWaterMark.replace('T', ' ').replace("Z",""));         
                //Setting HIGH_WATER_MARK_VALUE if a greater value is detected.
                if(maxHighWaterMarkValue !=null && maxHighWaterMarkValue.after(oldDate)){
                    setHighWaterMark(tableNm,dtFormat.format(maxHighWaterMarkValue).replace(" ", "T").concat("Z"));
                }
    
            }
            else{
                dataset.toJSON().write().text(clusterAddress);
            }
    
    
            con.close();
            jsc.close();
    
        }
    }
    

    你知道为什么 滤波器 $greater 提取记录不正确吗?

    1 回复  |  直到 6 年前
        1
  •  0
  •   aiman    6 年前

    我通过添加 .persist() 对于 Dataset :

    /* ********Read data from MongoDB******* */
    Dataset<Row> dataset = MongoSpark.load(jsc).toDF().persist();
    ....
    ..
    ...
    Dataset<Row> filtered = dataset.filter(dataset.col(highWaterCol).$greater(old)).persist();
    

    我不知道为什么没有 persist() 筛选器未在整个数据集上运行。