代码之家  ›  专栏  ›  技术社区  ›  BAE

java.io.NotSerializableException异常火花中

  •  1
  • BAE  · 技术社区  · 6 年前

    以下是我星火工作的一部分:

    def parse(evt: Event): String = {
      try {
        val config = new java.util.HashMap[java.lang.String, AnyRef] // Line1
        config.put("key", "value") // Line2
        val decoder = new DeserializerHelper(config, classOf[GenericRecord]) // Line3
        val payload = decoder.deserializeData(evt.getId, evt.toBytes)
        val record = payload.get("data")
        record.toString
      } catch {
        case e :Exception => "exception:" + e.toString
      }
    }
    
    try {
      val inputStream = KafkaUtils.createDirectStream(
        ssc,
        PreferConsistent,
        Subscribe[String, String](Array(inputTopic), kafkaParams)
      )
      val processedStream = inputStream.map(record => parse(record.value()))
      processedStream.print()
    } finally {
    }
    

    如果我把上面代码中的第1-3行移到外面 parse() 函数,我得到

    Caused by: java.io.NotSerializableException: SchemaDeserializerHelper
    Serialization stack:
        - object not serializable (class: SchemaDeserializerHelper, value: SchemaDeserializerHelper@2e23c180)
        - field (class: App$$anonfun$1, name: decoder$1, type: class SchemaDeserializerHelper)
        - object (class App$$anonfun$1, <function1>)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
        ... 22 more
    

    为什么?我不喜欢把第1~3行放在 解析() 功能, 如何优化?

    谢谢

    0 回复  |  直到 6 年前