我找到了答案,
这个代码是为我设计的。
package com.spark.streaming
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
object MessageStreaming {
def main(args: Array[String]): Unit = {
println("Message streaming")
val conf = new org.apache.spark.SparkConf().setMaster("local[*]").setAppName("kafka-streaming")
val context = new SparkContext(conf)
val ssc = new StreamingContext(context, org.apache.spark.streaming.Seconds(10))
val kafkaParams = Map(
"bootstrap.servers" -> "kafka.kafka-cluster.com:9092",
"group.id" -> "kafka-streaming-example",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"zookeeper.connection.timeout.ms" -> "200000"
)
val topics = Array("cdc-classic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
val content = stream.filter(x => x.value() != null)
val sqlContext = new org.apache.spark.sql.SQLContext(context)
import sqlContext.implicits._
stream.map(_.value).foreachRDD(rdd => {
rdd.foreach(println)
if (!rdd.isEmpty()) {
rdd.toDF("value").coalesce(1).write.mode(SaveMode.Append).json("hdfs://dev1a/user/hg5tv0/hadoop/MessagesFromKafka")
}
})
ssc.start()
ssc.awaitTermination
}}