代码之家  ›  专栏  ›  技术社区  ›  Yunus Einsteinium

使用Spark Cassandra连接器更新或插入流

  •  0
  • Yunus Einsteinium  · 技术社区  · 5 年前

    使用 Spark Cassandra Connector 所有的流数据总是被很好地插入到cassandra数据库中。尽管这不是期望的结果。

    我想实现的是在 employeetitle 列匹配。

    以下是我目前所掌握的

    // Create direct kafka stream with brokers and topics
        JavaInputDStream<ConsumerRecord<String, Loan>> messages = KafkaUtils.createDirectStream(
                javaStreamingContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
        JavaDStream<Loan> loanDStream = messages.map(record -> record.value());
        loanDStream.foreachRDD((loanJavaRDD, time) -> {
            System.out.println("Count "+loanJavaRDD.count());
        });
        JavaDStream<Loan> window = loanDStream.window(Durations.minutes(1), Durations.seconds(10));
        JavaPairDStream<String, BigDecimal> employeeTitleLoanPair = window.mapToPair(loan -> new Tuple2<>(loan.getEmployeeTitle(), loan.getLoanAmount())).reduceByKey((bigDecimal, bigDecimal2) -> bigDecimal.add(bigDecimal2));
        employeeTitleLoanPair.print();
        // Map Cassandra table column
        Map<String, String> columnNameMappings = new HashMap<String, String>();
        columnNameMappings.put("id", "id");
        columnNameMappings.put("employeeTitle", "employeetitle");
        columnNameMappings.put("totalLoan", "totalloan");
    
        employeeTitleLoanPair.foreachRDD((pairsRDD, time) -> {
            CassandraJavaUtil
                    .javaFunctions(pairsRDD.map(pair -> new EmployeeLoan(UUID.randomUUID(), pair._1, pair._2)).filter(employeeLoan -> !employeeLoan.getEmployeeTitle().equals("")))
                    .writerBuilder("loan_keyspace", "emp_title_loans", CassandraJavaUtil.mapToRow(EmployeeLoan.class, columnNameMappings))
                    .saveToCassandra();
        });
    
        // Start the computation
        javaStreamingContext.start();
        javaStreamingContext.awaitTermination();
    

    如何检查列相等和更新或插入

    0 回复  |  直到 5 年前