代码之家  ›  专栏  ›  技术社区  ›  Ged

foreachrdd中的case类导致序列化错误

  •  0
  • Ged  · 技术社区  · 6 年前

    如果我不尝试使用case类,只使用todf()或通过todf(“c1,”c2”)分配列的默认名称,那么可以在foreachrdd内创建df。

    一旦我尝试使用一个案例类,并查看了这些例子,我会得到:

    Task not serializable
    

    如果我将case类语句转换为:

    toDF() not part of RDD[CaseClass]
    

    这是一个遗留问题,但是我很好奇Spark会产生第n个序列化错误,如果它继续进入结构化的流媒体中。

    我有一个不需要拆分的RDD,可能是这个问题吗?不,运行数据块?

    编码如下:

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import scala.collection.mutable
    
    case class Person(name: String, age: Int) //extends Serializable // Some say inherently serializable so not required
    
    val spark = SparkSession.builder
        .master("local[4]")
        .config("spark.driver.cores", 2)
        .appName("forEachRDD")
        .getOrCreate()
    
    val sc = spark.sparkContext
    val ssc = new StreamingContext(spark.sparkContext, Seconds(1)) 
    
    val rddQueue = new mutable.Queue[RDD[List[(String, Int)]]]()
    val QS = ssc.queueStream(rddQueue) 
    
    QS.foreachRDD(q => {
       if(!q.isEmpty) {   
          import spark.implicits._
          val q_flatMap = q.flatMap{x=>x}
          val q_withPerson = q_flatMap.map(field => Person(field._1, field._2))
          val df = q_withPerson.toDF() 
          df.show(false)
       }
     }
    )
    
    ssc.start()
    for (c <- List(List(("Fred",53), ("John",22), ("Mary",76)), List(("Bob",54), ("Johnny",92), ("Margaret",15)), List(("Alfred",21), ("Patsy",34), ("Sylvester",7)) )) {
     rddQueue += ssc.sparkContext.parallelize(List(c))
    } 
    ssc.awaitTermination() 
    
    1 回复  |  直到 6 年前
        1
  •  0
  •   Ged    6 年前

    虽然没有和Java一起长大,但环顾四周,我发现了该做什么,但我没有足够的专家来解释。

    我在一个数据砖块笔记本里跑步,在那里我做了原型。

    线索是

    case class Person(name: String, age: Int)
    

    在同一个数据库笔记本中。需要在一个单独的笔记本中定义当前笔记本外部的case类,从而与运行流式处理的类分离。

    推荐文章