我正在建立一个模式来接受一些数据流。它有一个包含一些元素的ArrayType。以下是我的StructType和ArrayType:
val innerBody = StructType(
StructField("value", LongType, false) ::
StructField("spent", BooleanType, false) ::
StructField("tx_index", LongType, false) :: Nil)
val prev_out = StructType(StructField("prev_out", innerBody, false) :: Nil)
val body = StructType(
StructField("inputs", ArrayType(prev_out, false), false) ::
StructField("out", ArrayType(innerBody, false), false) :: Nill)
val schema = StructType(StructField("x", body, false) :: Nil)
这将构建一个类似“的架构”
root
|-- bit: struct (nullable = true)
| |-- x: struct (nullable = false)
| | |-- inputs: array (nullable = false)
| | | |-- element: struct (containsNull = false)
| | | | |-- prev_out: struct (nullable = false)
| | | | | |-- value: long (nullable = false)
| | | | | |-- spent: boolean (nullable = false)
| | | | | |-- tx_index: long (nullable = false)
| | |-- out: array (nullable = false)
| | | |-- element: struct (containsNull = false)
| | | | |-- value: long (nullable = false)
| | | | |-- spent: boolean (nullable = false)
| | | | |-- tx_index: long (nullable = false)
我正在尝试从模式中的“value元素”中选择值,因为它正在流入。我正在使用writeStream接收器。
val parsed = df.select("bit.x.inputs.element.prev_out.value")
.writeStream.format("console").start()
我有上面的代码,但给出了一个错误。
消息:无法解析'
bit.x.inputs.element.prev_out.value
'给定
输入列:[键、值、时间戳、分区、偏移、,
时间戳类型,主题];;
如何访问此架构中的“value”元素?