代码之家  ›  专栏  ›  技术社区  ›  bboy

通过使用Levenshtein算法与另一列中的现有数据进行比较来更新dataframe列

  •  1
  • bboy  · 技术社区  · 6 年前

    如何使用Levenshtein算法更新m\u name列以替换null?

    +--------------------+--------------------+-------------------+
    |       original_name|              m_name|            created|
    +--------------------+--------------------+-------------------+
    |            New York|            New York|2017-08-01 09:33:40|
    |            new york|                null|2017-08-01 15:15:06|
    |       New York city|                null|2017-08-01 15:15:06|
    |          california|          California|2017-09-01 09:33:40|
    | California,000IU...|                null|2017-09-01 01:40:00|
    |         Californiya|          California|2017-09-01 11:38:21|
    

    对于每个“原始名称”,应取最接近的“m\U名称”值,该值由基于Levenshtein距离(编辑距离)的算法创建。

    similarity(s1,s2) = [max(len(s1), len(s2)) − editDistance(s1,s2)] / max(len(s1), len(s2))
    

    “理想”的最终结果应该是这样的

    +--------------------+--------------------+-------------------+
    |       original_name|              m_name|            created|
    +--------------------+--------------------+-------------------+
    |            New York|            New York|2017-08-01 09:33:40|
    |            new york|            New York|2017-08-01 15:15:06|
    |       New York city|            New York|2017-08-01 15:15:06|
    |          california|          California|2017-09-01 09:33:40|
    | California,000IU...|          California|2017-09-01 01:40:00|
    |         Californiya|          California|2017-09-01 11:38:21|
    
    1 回复  |  直到 6 年前
        1
  •  2
  •   Ramesh Maharjan    6 年前

    贷记至 rossettacode Levenshtein_distance

    您可以执行以下操作(为了清晰和解释,请发表评论)

    //collecting the m_name to unique set and filtering out nulls and finally broadcasting to be used in udf function
    import org.apache.spark.sql.functions._
    val collectedList = df.select(collect_set("m_name")).rdd.collect().flatMap(row => row.getAs[Seq[String]](0).filterNot(_ == "null")).toList
    val broadcastedList = sc.broadcast(collectedList)
    
    //levenshtein distance formula applying
    import scala.math.{min => mathmin, max => mathmax}
    def minimum(i1: Int, i2: Int, i3: Int) = mathmin(mathmin(i1, i2), i3)
    
    def editDistance(s1: String, s2: String) = {
      val dist = Array.tabulate(s2.length + 1, s1.length + 1) { (j, i) => if (j == 0) i else if (i == 0) j else 0 }
    
      for (j <- 1 to s2.length; i <- 1 to s1.length)
        dist(j)(i) = if (s2(j - 1) == s1(i - 1)) dist(j - 1)(i - 1)
        else minimum(dist(j - 1)(i) + 1, dist(j)(i - 1) + 1, dist(j - 1)(i - 1) + 1)
    
      dist(s2.length)(s1.length)
    }
    
    //udf function definition to find the levenshtein distance and finding the closest first match from the broadcasted list with original_name column
    def levenshteinUdf = udf((str1: String)=> {
      val distances = for(str2 <- broadcastedList.value) yield (str2, editDistance(str1.toLowerCase, str2.toLowerCase))
      distances.minBy(_._2)._1
    })
    
    
    //calling the udf function when m_name is null
    df.withColumn("m_name", when(col("m_name").isNull || col("m_name") === "null", levenshteinUdf(col("original_name"))).otherwise(col("m_name"))).show(false)
    

    这应该给你

    +-------------------+----------+-------------------+
    |original_name      |m_name    |created            |
    +-------------------+----------+-------------------+
    |New York           |New York  |2017-08-01 09:33:40|
    |new york           |New York  |2017-08-01 15:15:06|
    |New York city      |New York  |2017-08-01 15:15:06|
    |california         |California|2017-09-01 09:33:40|
    |California,000IU...|California|2017-09-01 01:40:00|
    |Californiya        |California|2017-09-01 11:38:21|
    +-------------------+----------+-------------------+
    

    注意:我没有使用您的 similarity(s1,s2) = [max(len(s1), len(s2)) − editDistance(s1,s2)] / max(len(s1), len(s2)) 逻辑作为其错误输出