代码之家  ›  专栏  ›  技术社区  ›  GOGO

“row+:savedState.toSeq”在StateStoreRestoreExec中做什么。doExecute?

  •  2
  • GOGO  · 技术社区  · 7 年前

    我们可以看到StateStoreRestoreExec如下。

    case class StateStoreRestoreExec(
        keyExpressions: Seq[Attribute],
        stateId: Option[OperatorStateId],
        child: SparkPlan)
      extends UnaryExecNode with StateStoreReader {
    
      override protected def doExecute(): RDD[InternalRow] = {
        val numOutputRows = longMetric("numOutputRows")
    
      child.execute().mapPartitionsWithStateStore(
        getStateId.checkpointLocation,
        operatorId = getStateId.operatorId,
        storeVersion = getStateId.batchId,
        keyExpressions.toStructType,
        child.output.toStructType,
        sqlContext.sessionState,
        Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
          val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
          iter.flatMap { row =>
            val key = getKey(row)
            val savedState = store.get(key)
            numOutputRows += 1
            row +: savedState.toSeq
          }
    }
    

    row +: savedState.toSeq . 我认为row是UnsafeRow和savedState的一个例子。toSeq是Seq的一个实例。那么我们如何操作它们呢 +: savedState.toSeq 工作

    1 回复  |  直到 6 年前
        1
  •  2
  •   Yuval Itzchakov    7 年前

    row InternalRow savedState 是一个 Option[UnsafeRow] ,它扩展了 内部行 . 这里发生的是保存的状态从 到a Seq[UnsafeRow] 然后是 一行

    当你 flatMap 在这些上面 UnsafeRow Iterator[UnsafeRow] .