代码之家  ›  专栏  ›  技术社区  ›  jarry jafery

SparkStreaming:DirectStream RDD到数据帧[重复]

  •  0
  • jarry jafery  · 技术社区  · 6 年前

    我正在研究spark流上下文,它从avro序列化中的kafka主题获取数据,如下所示。

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "schema.registry.url" -> "http://localhost:8081",
      "key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
      "value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
      "group.id" -> "1"
    )
    

    使用Kafka utils,我创建了如下的直接流

    val topics = Set("mysql-foobar")
    
    
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String,String](
        topics,
        kafkaParams)
    )
    

    我还将数据作为

    stream.foreachRDD ( rdd => {
      rdd.foreachPartition(iterator => {
        while (iterator.hasNext) {
          val next = iterator.next()
          println(next.value())
        }
      })
    })
    

    this this 我的输出如下

    {"c1": 4, "c2": "Jarry", "create_ts": 1536758512000, "update_ts": 1537204805000}
    
    1 回复  |  直到 6 年前
        1
  •  1
  •   OneCricketeer Gabriele Mariotti    6 年前

    由于您使用的是合流序列化程序,而且它们目前不提供与Spark的轻松集成,因此您可以通过AbsaOSS在Github上签出一个相对较新的库来帮助实现这一点。

    你可以找到 examples of what you're looking for here

    另请参见 Integrating Spark Structured Streaming with the Kafka Schema Registry