我们可以看到StateStoreRestoreExec如下。
case class StateStoreRestoreExec(
keyExpressions: Seq[Attribute],
stateId: Option[OperatorStateId],
child: SparkPlan)
extends UnaryExecNode with StateStoreReader {
override protected def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsWithStateStore(
getStateId.checkpointLocation,
operatorId = getStateId.operatorId,
storeVersion = getStateId.batchId,
keyExpressions.toStructType,
child.output.toStructType,
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
iter.flatMap { row =>
val key = getKey(row)
val savedState = store.get(key)
numOutputRows += 1
row +: savedState.toSeq
}
}
row +: savedState.toSeq
. 我认为row是UnsafeRow和savedState的一个例子。toSeq是Seq的一个实例。那么我们如何操作它们呢
+:
savedState.toSeq
工作