代码之家  ›  专栏  ›  技术社区  ›  kingledion

如何使用Scala添加两列sparsevector?

  •  0
  • kingledion  · 技术社区  · 6 年前

    像这样的

    df.columns
    df: org.apache.spark.sql.DataFrame = [v1: SparseVector, v2: SparseVector]
    
    df.withColumn("v3", ADD_COL_FUNCTION(col(v1), col(v2)))
    
    2 回复  |  直到 6 年前
        1
  •  0
  •   kingledion    6 年前

    Spark中没有sparsevector的内置加法函数。 DenseVector 对象可以通过将它们转换为数组来处理,但是对于 SparseVector 这可能是个记忆杀手。您可以将sparsevector解释为一个映射,然后将映射“添加”到一起。

    import org.apache.spark.mllib.linalg.{SparseVector, Vectors, Vector}
    
    def addVecCols(v1: SparseVector, v2: SparseVector): Vector = {
      val map1: Map[Int, Double] = (v1.indices zip v1.values).toMap
    
      Vectors.sparse(v1. size, 
        (map1 ++ (v2.indices zip v2.values).toMap)
          .map{ case (k, v) => k -> (v + map1.getOrElse(k, 0d))}
          .toList
      )
    
    
    val addVecUDF = udf((v1: SparseVector, v2: SparseVector) => addVecCols(v1, v2))
    

    请注意,在Spark 1.6中,返回类型 Vectors.sparse Vector ,而在Spark 2.X中,它是 SparseVector公司 ,因此调整 addVecCols 适当地。另外,在2.X中,可以使用 ml 图书馆而不是 mllib

    在数据帧上使用这个

    val outDF = inDF.withColumn("added", addVecUDF(col("one_vector"), col("other_vector")))
    
        2
  •  0
  •   kingledion    6 年前

    这是我们解决这个问题的最终办法。

    首先,我们实现了中提供的Spark和Breeze向量之间的隐式转换 this post (注意注释中的错误修复)。这提供了 asBreeze fromBreeze 下面代码中使用的转换。

    def addVectors(v1Col: String, v2Col: String, outputCol: String)
                : DataFrame => DataFrame = {
      df: DataFrame => {
        def add(v1: SparkVector, v2: SparkVector): SparkVector =
          (v1.asBreeze + v2.asBreeze).fromBreeze
        val func = udf((v1: SparkVector, v2: SparkVector) => add(v1, v2))
        df.withColumn(outputCol, func(col(v1Col), col(v2Col)))
      }
    }
    

     df.transform(addVectors(col1Name, col2name, colOutName))
    

    当然,您可能希望包括一些列名称的检查,并确保输出列不会覆盖任何您不希望的内容。