以下是我星火工作的一部分:
def parse(evt: Event): String = {
try {
val config = new java.util.HashMap[java.lang.String, AnyRef] // Line1
config.put("key", "value") // Line2
val decoder = new DeserializerHelper(config, classOf[GenericRecord]) // Line3
val payload = decoder.deserializeData(evt.getId, evt.toBytes)
val record = payload.get("data")
record.toString
} catch {
case e :Exception => "exception:" + e.toString
}
}
try {
val inputStream = KafkaUtils.createDirectStream(
ssc,
PreferConsistent,
Subscribe[String, String](Array(inputTopic), kafkaParams)
)
val processedStream = inputStream.map(record => parse(record.value()))
processedStream.print()
} finally {
}
如果我把上面代码中的第1-3行移到外面
parse()
函数,我得到
Caused by: java.io.NotSerializableException: SchemaDeserializerHelper
Serialization stack:
- object not serializable (class: SchemaDeserializerHelper, value: SchemaDeserializerHelper@2e23c180)
- field (class: App$$anonfun$1, name: decoder$1, type: class SchemaDeserializerHelper)
- object (class App$$anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
... 22 more
为什么?我不喜欢把第1~3行放在
解析()
功能,
如何优化?
谢谢