代码之家  ›  专栏  ›  技术社区  ›  Jice

将Spark RDD转换为数据集

  •  1
  • Jice  · 技术社区  · 7 年前

    在进行了一些文本挖掘之后,我试图进行kmean聚类,但我找不到如何转换ParseWikipedia的结果。kmean所需的数据集中的termDocumentMatrix。拟合方法

    scala> val (termDocMatrix, termIds, docIds, idfs) = ParseWikipedia.termDocumentMatrix(lemmas, stopWords, numTerms, sc)
    scala> val kmeans = new KMeans().setK(5).setMaxIter(200).setSeed(1L)
    scala> termDocMatrix.take(1)
    res24: Array[org.apache.spark.mllib.linalg.Vector] = Array((1000,[32,166,200,223,577,645,685,873,926],[0.18132966949934762,0.3777537726516676,0.3178848913768969,0.43380819546465704,0.30604090845847254,0.46007361524957147,0.2076406414508386,0.2995665853335863,0.1742843713808876]))
    
    scala> val modele = kmeans.fit(termDocMatrix)
    <console>:66: error: type mismatch;
     found   : org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
     required: org.apache.spark.sql.Dataset[_]
           val modele = kmeans.fit(termDocMatrix)
    

    我尝试了一些转换,但总是有错误

    scala> import spark.implicits._
    import spark.implicits._
    
    scala> val ss=org.apache.spark.sql.SparkSession.builder().getOrCreate()
    scala> ss.createDataset(termDocMatrix)
    <console>:67: error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
       ss.createDataset(termDocMatrix)
    

    和其他(由于不是针对数据集,因此具有预期结果)

    val termDocRows = termDocMatrix.map(org.apache.spark.sql.Row(_))
    val schemaVecteurs = StructType(Seq(StructField("features", VectorType, true)))
    val termDocVectors = spark.createDataFrame(termDocRows, schemaVecteurs)
    val termDocMatrixDense = termDocMatrix.map(e => e.toDense)
    

    (并尝试kmeans.fit每个)。唯一给出不同错误的是termDocVectors

    val modele = kmeans.fit(termDocVectors)
    18/01/05 01:14:52 ERROR Executor: Exception in task 0.0 in stage 560.0 (TID 1682)
    java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: org.apache.spark.mllib.linalg.SparseVector is not a valid external type for schema of vector
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else newInstance(class org.apache.spark.ml.linalg.VectorUDT).serialize AS features#75
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)
    

    有人有线索吗? 谢谢你的帮助

    此外,在测试提供的线索后:

    我在哪里可以申请toDS?

    scala> termDocMatrix.toDS
    <console>:69: error: value toDS is not a member of org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
       termDocMatrix.toDS
    

    使用元组。。。
    我仍然有一个错误(这次不同)

    val ds = spark.createDataset(termDocMatrix.map(Tuple1.apply)).withColumnRenamed("_1", "features")
    ds: org.apache.spark.sql.DataFrame = [features: vector]
    scala> val modele = kmeans.fit(ds)
    java.lang.IllegalArgumentException: requirement failed: Column features must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually org.apache.spark.mllib.linalg.VectorUDT@f71b0bce.
    

    最初的问题似乎已经解决了。现在,我面临着一个新的问题,我从mllib计算VD。Rowmatrix和kmeans似乎在等待ml向量。我只需要找到如何在ml包中计算SVD。。。

    1 回复  |  直到 7 年前
        1
  •  1
  •   Leo C    7 年前

    Spark的数据集API没有为 org.apache.spark.mllib.linalg.Vector . 也就是说,您可以尝试将MLlib向量的RDD转换为 Dataset 首先将向量映射到 Tuple1 s如以下示例所示,以查看您的ML模型是否采用它:

    import org.apache.spark.mllib.linalg.{Vector, Vectors}
    
    val termDocMatrix = sc.parallelize(Array(
      Vectors.sparse(
        1000, Array(32, 166, 200, 223, 577, 645, 685, 873, 926), Array(
          0.18132966949934762, 0.3777537726516676, 0.3178848913768969,
          0.43380819546465704, 0.30604090845847254, 0.46007361524957147,
          0.2076406414508386, 0.2995665853335863, 0.1742843713808876
      )),
      Vectors.sparse(
        1000, Array(74, 154, 343, 405, 446, 538, 566, 612 ,732), Array(
          0.12128098267647237, 0.2499114848264329, 0.1626128536458679,
          0.12167467201712565, 0.2790928578869498, 0.24904429178306794,
          0.10039172907499895, 0.22803472531961744, 0.36408630055671115
      ))
    ))
    // termDocMatrix: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = ...
    
    val ds = spark.createDataset(termDocMatrix.map(Tuple1.apply)).
      withColumnRenamed("_1", "features")
    // ds: org.apache.spark.sql.Dataset[(org.apache.spark.mllib.linalg.Vector,)] = [features: vector]
    
    ds.show
    // +--------------------+
    // |            features|
    // +--------------------+
    // |(1000,[32,166,200...|
    // |(1000,[74,154,343...|
    // +--------------------+