代码之家  ›  专栏  ›  技术社区  ›  Arjun

如何根据传递的参数从RDD中提取值

  •  -2
  • Arjun  · 技术社区  · 6 年前

    我已经创建了一个键值RDD,但是我不知道如何从中选择值。

    val mapdf = merchantData_df.rdd.map(row => {
        val Merchant_Name = row.getString(0)
        val Display_Name = row.getString(1)
        val Store_ID_name = row.getString(2)
        val jsonString = s"{Display_Name: $Display_Name, Store_ID_name: $Store_ID_name}"
        (Merchant_Name, jsonString)
    })
    
    scala> mapdf.take(4).foreach(println)
    (Amul,{Display_Name: Amul, Store_ID_name: null})
    (Nestle,{Display_Name: Nestle, Store_ID_name: null})
    (Ace,{Display_Name: Ace , Store_ID_name: null})
    (Acme ,{Display_Name: Acme Fresh Market, Store_ID_name: Acme Markets})
    

    现在假设函数的输入字符串是 Amul ,我的预期输出 DisplayName is Amul 还有另一个功能 StoreID to return NULL .

    我怎样才能做到?

    我不想使用sparksql

    1 回复  |  直到 6 年前
        1
  •  1
  •   Arjun    6 年前

    +-----------------+-----------------+-------------+
    |Merchant_Name    |Display_Name     |Store_ID_name|
    +-----------------+-----------------+-------------+
    |Fitch            |Fitch            |null         |
    |Kids             |Kids             |null         |
    |Ace Hardware     |Ace Hardware     |null         |
    | Fresh Market    |Acme  Market     |Acme Markets |
    |Adventure        | Island          |null         |
    +-----------------+-----------------+-------------+
    

    可以用字符串参数编写函数

    import org.apache.spark.sql.functions._
    def filterRowsWithKey(key: String) = df.filter(col("Merchant_Name") === key).select("Display_Name", "Store_ID_name")
    

    并将函数调用为

    filterRowsWithKey("Fitch").show(false)
    

    会给你

    +------------+-------------+
    |Display_Name|Store_ID_name|
    +------------+-------------+
    |Fitch       |null         |
    +------------+-------------+
    

    我希望答案有帮助

    更新

    如果希望第一行作为字符串从函数返回,则可以这样做

    import org.apache.spark.sql.functions._
    def filterRowsWithKey(key: String) = df.filter(col("Merchant_Name") === key).select("Display_Name", "Store_ID_name").first().mkString(",")
    
    println(filterRowsWithKey("Fitch"))
    

    哪个应该给你

    Fitch,null
    

    如果找不到传递的键,上面的函数将引发异常,因此为了安全起见,可以使用以下函数

    import org.apache.spark.sql.functions._
    def filterRowsWithKey(key: String) = {
      val filteredDF = df.filter(col("Merchant_Name") === key).select("Display_Name", "Store_ID_name")
      if(filteredDF.count() > 0) filteredDF.first().mkString(",") else "key not found"
    }