代码之家  ›  专栏  ›  技术社区  ›  Subham Agrawal

验证列并在其他列中写入错误消息

  •  -1
  • Subham Agrawal  · 技术社区  · 6 年前

    我这样做时出错了:

    val input = spark.read.option("header", "true").option("delimiter", "\t").schema(trFile).csv(fileNameWithPath)
    
    val newSchema = trFile.add("ERROR_COMMENTS", StringType, true)
    
    // Call you custom validation function
    val validateDS = dataSetMap.map { row => validateColumns(row) }    //<== error here
    
    // Reconstruct the DataFrame with additional columns                      
    val checkedDf = spark.createDataFrame(validateDS, newSchema)
    
    def validateColumns(row: Row): Row = {
      var err_val: String = null
      val effective_date = row.getAs[String]("date")
      .................
    
      Row.merge(row, Row(err_val))
    }
    

    错误消息:

    ◾Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. 
    ◾not enough arguments for method map: (implicit evidence$6: org.apache.spark.sql.Encoder[org.apache.spark.sql.Row])org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]. Unspecified value parameter evidence$6
    

    这是我的模式:

    val FileSchema = StructType(
        Array(
          StructField("date", StringType),
          StructField("count", StringType),
          StructField("name", StringType)
          ))
    

    我是新来的火花,让我知道,这里的问题是什么,有什么最好的方法来实现这一点。我正在使用Spark版本2.3。

    1 回复  |  直到 6 年前
        1
  •  0
  •   Shaido MadHadders    6 年前

    使用一个 UDF 在这种情况下,您不必担心scehma更改,使用 row.getAs 等等。

    首先,将方法转换为 自定义项 功能:

    import org.apache.spark.sql.functions.udf
    
    val validateColumns = udf((date: String, count: String, name: String)){
     // error logic using the 3 column strings
      err_val
    }
    

    要将新列添加到数据帧,请使用 withColumn() ,

    val checkedDf = input.withColumn("ERROR_COMMENTS", validateColumns($"date", $"count", $"name"))