代码之家  ›  专栏  ›  技术社区  ›  cscan ssice

将Spark结构化流输出写入Kafka主题

  •  1
  • cscan ssice  · 技术社区  · 6 年前

    SparkConf conf = new SparkConf()
            .setMaster("local[*]")
            .setAppName("test");
    
    SparkSession spark = SparkSession
            .builder()
            .config(conf)
            .getOrCreate();
    
    Dataset<Row> dataset = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", "start")
            .load();
    
    StreamingQuery query = dataset
            .writeStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("checkpointLocation", "checkpoint")
            .option("topic", "end")
            .start();
    
    query.awaitTermination(20000);
    

    start end

    1 回复  |  直到 6 年前
        1
  •  0
  •   user6910411    6 年前

    Dataset<Row> dataset = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", start.getTopicName())
            .option("startingOffsets", "earliest")
            .load();