代码之家  ›  专栏  ›  技术社区  ›  Tom Lous

Mongoexport在Spark中严格加载json

  •  1
  • Tom Lous  · 技术社区  · 7 年前

    mongoexport . documentation 提到所有json输出都在 严格的 模式

    "{amount":{"$numberLong":"3"},"count":{"$numberLong":"245"}}
    

    case class MongoData(amount: Long, count: Long)
    

    读取数据当然会失败,如下所示:

    spark
          .read
          .json(inputPath)
          .as[MongoData]
    

    有没有办法从mongo导出而不使用严格模式,或者在Scala中导入json而不手动将每个字段重新构造为适当的结构?

    1 回复  |  直到 7 年前
        1
  •  1
  •   Tom Lous    7 年前

    我现在用这个作为解决方案。但感觉有点粗糙。

    case class DataFrameExtended(dataFrame: DataFrame) {
    
       def undoMongoStrict(): DataFrame = {
        val numberLongType = StructType(List(StructField("$numberLong", StringType, true))) 
    
        def restructure(fields: Array[StructField], nesting: List[String] = Nil): List[Column] = {
          fields.flatMap(field => {
            val fieldPath = nesting :+ field.name
            val fieldPathStr = fieldPath.mkString(".")
            field.dataType match {
              case dt: StructType if dt == numberLongType =>
                Some(col(s"$fieldPathStr.$$numberLong").cast(LongType).as(field.name))
              case dt: StructType =>
                Some(struct(restructure(dt.fields, fieldPath): _*).as(field.name))
              case _ => Some(col(fieldPathStr).as(field.name))
              //              case dt:ArrayType => //@todo handle other DataTypes Array??
            }
          })
        }.toList
    
    
        dataFrame.select(restructure(dataFrame.schema.fields): _*)
      }
    }
    
    implicit def dataFrameExtended(df: DataFrame): DataFrameExtended = {
      DataFrameExtended(df)
    }
    
    spark
      .read
      .json(inputPath)
      .undoMongoStrict()