代码之家  ›  专栏  ›  技术社区  ›  Terry

在udf和日志行中使用Try-match进行错误处理时失败

  •  5
  • Terry  · 技术社区  · 6 年前

    Scala版本2.11和Spark 2.0.1。

    我有一个数据框架,在其中我在udf中执行一些操作。我希望能够运行操作,并仅在失败的行上返回错误。我还想将成功/失败作为一个附加字段返回。通过/失败可以在单独的列中。

    这就是我所尝试的:

    val df = Seq(("as", 1, "df"), ("1", 2, "3")).toDF("a", "b", "c")
    val df1 = Seq(("1", 1, "3"), ("1", 2, "3")).toDF("a", "b", "c")
    
    def myUdf = udf((i: String, j: Int, k: Int) => { 
       def test (ii:String, jj:Int, kk:Int): Try[Int] = {
         val q = i.toInt * j * k.toInt
         val m = q * i.toInt
         return (Try(q))
      }
      val q = Try(test(i, j, k)) match { 
        case Success(lines) => lines.toString
        case _ => "Failed"
      }
      q
    })
    
    # First Example
    val df2 = df.withColumn("D", myUdf($"a", $"b", $"c")) <-- This fails
    
    # Second Example 
    val df3 = df1.withColumn("D", myUdf($"a", $"b", $"c"))
    df3.show
      +---+---+---+----------+
      |  a|  b|  c|         D|
      +---+---+---+----------+
      |  1|  1|  3|Success(3)|
      |  1|  2|  3|Success(6)|
      +---+---+---+----------+
    

    1) 如何获得整数大小写的[0-9]值(而不是Success(3)和Success(6)--即删除Success和括号--3和6可以是字符)?此外,如何将成功/失败添加到每一行?

    2) 是否可以在Udf失败时使用Try match进行检查,而不在每个步骤进行错误处理?如果Udf在一个步骤上失败,我们如何进行下一次计算?注:“测试”方法中有大量计算。

    3) 全局检查udf的潜在替代方法有哪些?

    1 回复  |  直到 6 年前
        1
  •  5
  •   Shaido MadHadders    6 年前

    您可以使用 Try 但是,请注意 尝试 应环绕整个车身 test 方法,而不仅仅应用于结果(您也不应该使用 return 关键字)。使用后 match 以获得结果。

    def myUdf = udf((i: String, j: Int, k: String) => { 
      def test(ii: String, jj: Int, kk: String): Try[Int] = Try {
        val q = i.toInt * j * k.toInt
        val m = q * i.toInt
        q
      }
    
      test(i, j, k) match { 
        case Success(lines) => lines.toString
        case _ => "Failed"
      }
    })
    

    请注意 k 以及 kk 是字符串类型,因为这是您在两个测试数据帧中所拥有的类型。如果您使用 Int 并且不能隐式转换列值(例如“df”),该行将不会运行udf,您将得到 null

    使用两个数据帧的结果:

    +---+---+---+------+
    |  a|  b|  c|     D|
    +---+---+---+------+
    | as|  1| df|Failed|
    |  1|  2|  3|     6|
    +---+---+---+------+
    
    +---+---+---+---+
    |  a|  b|  c|  D|
    +---+---+---+---+
    |  1|  1|  3|  3|
    |  1|  2|  3|  6|
    +---+---+---+---+
    
    1. 可以看到,这将只给出值或 "Failed" 结果,成功被删除,即结果作为字符串返回。

    2. 测验 方法,将引发异常,该异常被 尝试 。这意味着该方法将在失败时退出,而不会继续到最后。

    3. 要查找所有失败的行,请使用 filter 方法: df2.filter($"D" === "Failed")