代码之家  ›  专栏  ›  技术社区  ›  pooja

当连接列中有多个值时,如何在Spark scala中应用连接

  •  0
  • pooja  · 技术社区  · 6 年前

    我有两个文本文件中的数据作为

    file 1:(patient id,diagnosis code)
    +----------+-------+
    |patient_id|diag_cd|
    +----------+-------+
    |         1|  y,t,k|
    |         2|  u,t,p|
    |         3|  u,t,k|
    |         4|  f,o,k|
    |         5|  e,o,u|
    +----------+-------+
    
    file2(diagnosis code,diagnosis description) Time T1
    +-------+---------+
    |diag_cd|diag_desc|
    +-------+---------+
    |      y|      yen|
    |      t|      ten|
    |      k|      ken|
    |      u|      uen|
    |      p|      pen|
    |      f|      fen|
    |      o|      oen|
    |      e|      een|
    +-------+---------+
    

    文件2中的数据不固定,不断变化,意味着在任何给定的时间点,诊断代码Y都可以将诊断描述设置为Yen,而在其他时间点,诊断描述设置为10。例如下面

    file2 at Time T2
    +-------+---------+
    |diag_cd|diag_desc|
    +-------+---------+
    |      y|      ten|
    |      t|      yen|
    |      k|      uen|
    |      u|      oen|
    |      p|      ken|
    |      f|      pen|
    |      o|      een|
    |      e|      fen|
    +-------+---------+
    

    我必须在Spark中读取这两个文件的数据,并且只需要那些被诊断出患有UEN的患者ID。 它可以使用spark sql或scala两者来完成。

    我试着读火花壳里的文件。文件1中的两列由管道分隔。

    scala> val tes1 = sc.textFile("file1.txt").map(x => x.split('|')).filter(y => y(1).contains("u")).collect
    tes1: Array[Array[String]] = Array(Array(2, u,t,p), Array(3, u,t,k), Array(5, e,o,u))
    

    但是,由于与诊断描述相关的诊断代码在文件2中不是常量,因此必须使用连接条件。但我不知道当file1中的diag_cd列有多个值时如何应用联接。

    任何帮助都将不胜感激。

    2 回复  |  直到 6 年前
        1
  •  1
  •   Chandan Ray    6 年前

    请在下面找到答案

    //将文件1读取到数据帧中

    val file1df=spark.read.format(“csv”).option(“delimiter”,“”))
    .option(“header”,真)
    .load(“file1path”)。
    < /代码> 
    
    

    //将文件2读取到数据帧中

    val file2df=spark.read.format(“csv”).option(“delimiter”,“”))
    .option(“header”,真)
    .load(“file2path”)。
    < /代码> 
    
    

    //获取诊断描述为UEN的患者ID数据框

    file1df.join(file2df,file1df.col(“diag_cd”).contains(file2df.col(“diag_cd”)),“inner”)
    .filter(文件2df.col(“diag_desc”)==“uen”)
    。选择(“患者编号”)。显示
    < /代码> 
    
    

    //将文件2读取到数据帧中

    val file2DF = spark.read.format("csv").option("delimiter","|")
    .option("header",true)
    .load("file2path")
    

    //获取诊断描述为UEN的患者ID数据框

    file1DF.join(file2DF,file1DF.col("diag_cd").contains(file2DF.col("diag_cd")),"inner")
    .filter(file2DF.col("diag_desc") === "uen")
    .select("patient_id").show
    

    enter image description here

        2
  •  0
  •   moriarty007    6 年前
    1. 使用explode方法将表T1从format1转换为format2。

      格式1:

      file 1:(patient id,diagnosis code)
      +----------+-------+
      |patient_id|diag_cd|
      +----------+-------+
      |         1|  y,t,k|
      |         2|  u,t,p|
      +----------+-------+
      

      file 1:(patient id,diagnosis code)
      +----------+-------+
      |patient_id|diag_cd|
      +----------+-------+
      |         1|  y    |
      |         1|  t    |
      |         1|  k    |
      |         2|  u    |
      |         2|  t    |
      |         2|  p    |
      +----------+-------+
      

      代码:

      scala> val data = Seq("1|y,t,k", "2|u,t,p")
      data: Seq[String] = List(1|y,t,k, 2|u,t,p)
      
      scala> val df1 = sc.parallelize(data).toDF("c1").withColumn("patient_id", split(col("c1"), "\\|").getItem(0)).withColumn("col2", split(col("c1"), "\\|").getItem(1)).select("patient_id", "col2").withColumn("diag_cd", explode(split($"col2", "\\,"))).select("patient_id", "diag_cd")
      df1: org.apache.spark.sql.DataFrame = [patient_id: string, diag_cd: string]
      
      scala> df1.collect()
      res4: Array[org.apache.spark.sql.Row] = Array([1,y], [1,t], [1,k], [2,u], [2,t], [2,p])
      

      我在这里创建了虚拟数据用于说明。注意我们是如何使用

      scala> val df1 = sc.parallelize(data).toDF("c1").
       | withColumn("patient_id", split(col("c1"), "\\|").getItem(0)).
       | withColumn("col2", split(col("c1"), "\\|").getItem(1)).
       | select("patient_id", "col2").
       | withColumn("diag_cd", explode(split($"col2", "\\,"))).
       | select("patient_id", "diag_cd")
      

      df1:org.apache.spark.sql.dataframe=[患者编号:string,诊断编号:string]

    2. 现在可以使用-

      scala> val df2 = sc.textFile("file2.txt").map(x => (x.split(",")(0),x.split(",")(1))).toDF("diag_cd", "diag_desc")
      df2: org.apache.spark.sql.DataFrame = [diag_cd: string, diag_desc: string]
      
    3. 将DF1与DF2连接,并按要求过滤。

      df1.join(df2, df1.col("diag_cd") === df2.col("diag_cd")).filter(df2.col("diag_desc") === "ten").select(df1.col("patient_id")).collect()