代码之家  ›  专栏  ›  技术社区  ›  AmZ62210

如何在HortonWorks中将Spark流数据存储到HDF?

  •  0
  • AmZ62210  · 技术社区  · 6 年前

    我使用Spark从卡夫卡主题中传输数据。这是我试过的代码。在这里,我只是在控制台中显示流数据。我想将这些数据存储为HDFS中的文本文件。

    import _root_.kafka.serializer.DefaultDecoder
    import _root_.kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.storage.StorageLevel
    object StreamingDataNew {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("Kafka").setMaster("local[*]")
        val ssc = new StreamingContext(sparkConf, Seconds(10))
    val kafkaConf = Map(
          "metadata.broker.list" -> "localhost:9092",
          "zookeeper.connect" -> "localhost:2181",
          "group.id" -> "kafka-streaming-example",
          "zookeeper.connection.timeout.ms" -> "200000"
        )
    val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
          ssc,
          kafkaConf,
          Map("topic-one" -> 1), // subscripe to topic and partition 1
          StorageLevel.MEMORY_ONLY
        )
        println("printing" + lines.toString())
        val words = lines.flatMap { case (x, y) => y.split(" ") }
        words.print()
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    }
    

    我发现我们可以使用“saveastextfiles”编写数据流,但是有人能清楚地提到如何使用上面的scala代码与hortonworks连接并存储在hdfs中的步骤吗?

    1 回复  |  直到 6 年前
        1
  •  0
  •   AmZ62210    6 年前

    我找到了答案, 这个代码是为我设计的。

    package com.spark.streaming
    
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkContext
    import org.apache.spark.sql._
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    
    object MessageStreaming {
      def main(args: Array[String]): Unit = {
        println("Message streaming")
    
        val conf = new org.apache.spark.SparkConf().setMaster("local[*]").setAppName("kafka-streaming")
        val context = new SparkContext(conf)
        val ssc = new StreamingContext(context, org.apache.spark.streaming.Seconds(10))
        val kafkaParams = Map(
          "bootstrap.servers" -> "kafka.kafka-cluster.com:9092",
          "group.id" -> "kafka-streaming-example",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "auto.offset.reset" -> "latest",
          "zookeeper.connection.timeout.ms" -> "200000"
        )
        val topics = Array("cdc-classic")
        val stream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams))
    
        val content = stream.filter(x => x.value() != null)
        val sqlContext = new org.apache.spark.sql.SQLContext(context)
        import sqlContext.implicits._
    
        stream.map(_.value).foreachRDD(rdd => {
    
          rdd.foreach(println)
          if (!rdd.isEmpty()) {
         rdd.toDF("value").coalesce(1).write.mode(SaveMode.Append).json("hdfs://dev1a/user/hg5tv0/hadoop/MessagesFromKafka")
    
    
          }
    
        })
        ssc.start()
        ssc.awaitTermination
    
    }}