代码之家  ›  专栏  ›  技术社区  ›  Mohamed Seif

使用filstream的Spark streaming文字计数不会打印结果[重复]

  •  0
  • Mohamed Seif  · 技术社区  · 7 年前

     import org.apache.spark.SparkConf
     import org.apache.spark.streaming._
     import org.apache.spark.streaming.StreamingContext
    
     object TwitterHashtagStreaming {
    
     def main(args: Array[String]) : Unit = {
    
    val conf = new SparkConf().setAppName("TwitterHashtagStreaming").setMaster("local[2]").set("spark.executor.memory","1g");
    
    val streamingC = new StreamingContext(conf,Seconds(5))
    
    val streamLines = streamingC.textFileStream("file:///home/cloudera/Desktop/wordstream")
    val words = streamLines.flatMap(_.split(" "))
    val counts = words.map(word => (word, 1)).reduceByKey(_ + _)
    
     counts.print()
    
     streamingC.start()
     streamingC.awaitTermination()
    }
    
     }
    
    1 回复  |  直到 7 年前
        1
  •  2
  •   ashburshui    7 年前

    请仔细参考 document :

    def textFileStream(directory: String): DStream[String]
    

    创建用于监视Hadoop兼容文件系统的输入流 值为Text,输入格式为TextInputFormat)。文件必须是 通过从另一个目录“移动”它们来写入监视目录 忽略。

    总之,它是一个变化检测器,您必须启动流媒体服务,然后将数据写入监视器目录。

    该语义将模拟 “流概念” 网络数据包将逐渐像你的文件一样收入。