代码之家  ›  专栏  ›  技术社区  ›  user9404403

如何在writeStream中访问ArrayType中的元素?

  •  0
  • user9404403  · 技术社区  · 6 年前

    我正在建立一个模式来接受一些数据流。它有一个包含一些元素的ArrayType。以下是我的StructType和ArrayType:

    val innerBody = StructType(
        StructField("value", LongType, false) ::
        StructField("spent", BooleanType, false) ::
        StructField("tx_index", LongType, false) :: Nil)
    
    val prev_out = StructType(StructField("prev_out", innerBody, false) :: Nil)
    
    val body = StructType(
    StructField("inputs", ArrayType(prev_out, false), false) :: 
    StructField("out", ArrayType(innerBody, false), false) :: Nill)
    
    val schema = StructType(StructField("x",  body, false) :: Nil)
    

    这将构建一个类似“的架构”

    root
     |-- bit: struct (nullable = true)
     |    |-- x: struct (nullable = false)
     |    |    |-- inputs: array (nullable = false)
     |    |    |    |-- element: struct (containsNull = false)
     |    |    |    |    |-- prev_out: struct (nullable = false)
     |    |    |    |    |    |-- value: long (nullable = false)
     |    |    |    |    |    |-- spent: boolean (nullable = false)
     |    |    |    |    |    |-- tx_index: long (nullable = false)
     |    |    |-- out: array (nullable = false)
     |    |    |    |-- element: struct (containsNull = false)
     |    |    |    |    |-- value: long (nullable = false)
     |    |    |    |    |-- spent: boolean (nullable = false)
     |    |    |    |    |-- tx_index: long (nullable = false)
    

    我正在尝试从模式中的“value元素”中选择值,因为它正在流入。我正在使用writeStream接收器。

    val parsed = df.select("bit.x.inputs.element.prev_out.value")
    .writeStream.format("console").start() 
    

    我有上面的代码,但给出了一个错误。

    消息:无法解析' bit.x.inputs.element.prev_out.value '给定 输入列:[键、值、时间戳、分区、偏移、, 时间戳类型,主题];;

    如何访问此架构中的“value”元素?

    1 回复  |  直到 6 年前
        1
  •  1
  •   Rumesh Krishnan    6 年前

    如果您有这样的数据帧,首先分解,然后选择将对您有所帮助。

    df.printSchema()
    //root
    //|-- bit: struct (nullable = true)
    //|    |-- x: struct (nullable = true)
    //|    |    |-- inputs: array (nullable = true)
    //|    |    |    |-- element: struct (containsNull = true)
    //|    |    |    |    |-- prev_out: struct (nullable = true)
    //|    |    |    |    |    |-- spent: boolean (nullable = true)
    //|    |    |    |    |    |-- tx_infex: long (nullable = true)
    //|    |    |    |    |    |-- value: long (nullable = true)
    
    import org.apache.spark.sql.functions._
    val intermediateDf: DataFrame = df.select(explode(col("bit.x.inputs")).as("interCol"))
    intermediateDf.printSchema()
    
    //root
    //|-- interCol: struct (nullable = true)
    //|    |-- prev_out: struct (nullable = true)
    //|    |    |-- spent: boolean (nullable = true)
    //|    |    |-- tx_infex: long (nullable = true)
    //|    |    |-- value: long (nullable = true)
    
    val finalDf: DataFrame = intermediateDf.select(col("interCol.prev_out.value").as("value"))
    finalDf.printSchema()
    //root
    //|-- value: long (nullable = true)
    
    
    finalDf.show()
    //+-----------+
    //|      value|
    //+-----------+
    //|12347628746|
    //|12347628746|
    //+-----------+