代码之家  ›  专栏  ›  技术社区  ›  Aamir

如何将RDD[(字符串,Iterable[VertexId])]转换为数据帧?

  •  2
  • Aamir  · 技术社区  · 6 年前

    我创建了一个 RDD Graphx 看起来是这样的:

    val graph = GraphLoader.edgeListFile(spark.sparkContext, fileName)
    var s: VertexRDD[VertexId] = graph.connectedComponents().vertices
    
    val nodeGraph: RDD[(String, Iterable[VertexId])] = s.groupBy(_._2) map { case (x, y) =>
      val rand = randomUUID().toString
      val clusterList: Iterable[VertexId] = y.map(_._1)
      (rand, clusterList)
    }
    

    nodeGraph 是一种 RDD[(String, Iterable[VertexId])] ,其中的数据将采用以下格式:

    (abc-def11, Iterable(1,2,3,4)), 
    (def-aaa, Iterable(10,11)), 
    ...
    

    我现在要做的是用它创建一个数据帧,应该是这样的:

    col1        col2
    abc-def11   1
    abc-def11   2
    abc-def11   3
    abc-def11   4
    def-aaa     10
    def-aaa     11
    

    1 回复  |  直到 6 年前
        1
  •  3
  •   Shaido MadHadders    6 年前

    首先,使用 toDF() ,以及您想要的列名。这是最容易做到的改变 Iterable[VertexId] Seq[Long] 第一

    import spark.implicits._
    val df = nodeGraph.map(x => (x._1, x._2.map(_.toLong).toSeq)).toDF("col1", "col2")
    

    请注意,这可以在创建时完成 nodeGraph 保存一个步骤。接下来,使用 explode

    val df2 = df.withColumn("col2", explode($"col2"))
    

    这将为您提供所需的输出。