代码之家  ›  专栏  ›  技术社区  ›  kingledion

如何将sparsevectors传递到pyspark中的“mllib”

  •  2
  • kingledion  · 技术社区  · 6 年前

    我正在用pyspark 1.6.3通过齐柏林飞艇和python 3.5。

    我正在尝试使用pyspark实现潜在的dirichlet分配 CountVectorizer LDA 功能。首先,问题是:这是我使用的代码。让 df “标记化”列中带有标记化文本的Spark数据框

    vectors = 'vectors'
    cv = CountVectorizer(inputCol = 'tokenized', outputCol = vectors)
    model = cv.fit(df)
    df = model.transform(df)
    
    corpus = df.select(vectors).rdd.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
    ldaModel = LDA.train(corpus, k=25)
    

    此代码或多或少取自 pyspark api docs . 随时待命 LDA 我得到以下错误:

    net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
    

    这个 internet 告诉我这是由于类型不匹配造成的。

    所以我们来看看 LDA 以及来自 计数矢量器 . 从Spark Docs这里有另一个 example 一个稀疏的向量进入 LDA :

    >>> from pyspark.mllib.linalg import Vectors, SparseVector
    >>> data = [
    ...     [1, Vectors.dense([0.0, 1.0])],
    ...     [2, SparseVector(2, {0: 1.0})],
    ... ]
    >>> rdd =  sc.parallelize(data)
    >>> model = LDA.train(rdd, k=2, seed=1)
    

    我自己执行这个,这就是 rdd 看起来像:

    >> testrdd.take(2)
    
    [[1, DenseVector([0.0, 1.0])], [2, SparseVector(2, {0: 1.0})]]
    

    另一方面,如果我使用原始代码并查看 corpus 输出为的RDD 计数矢量器 ,我看到(编辑以删除无关位):

    >> corpus.take(3)
    
    [[0, Row(vectors=SparseVector(130593, {0: 30.0, 1: 13.0, ...
     [1, Row(vectors=SparseVector(130593, {0: 52.0, 1: 44.0, ...
     [2, Row(vectors=SparseVector(130593, {0: 14.0, 1: 6.0, ...
    ]
    

    所以我使用的例子(来自文档!)不会产生(index,sparsevector)的元组,而是(index,row(sparsevector))。还是什么?

    问题:

    • Sparsevector周围的行包装是什么导致了这个错误?
    • 如果是这样,如何除去行对象?row是df的属性,但我使用 df.rdd 转换为RDD;我还需要做什么?
    1 回复  |  直到 6 年前
        1
  •  1
  •   mayank agrawal    6 年前

    可能是问题所在。只是提取 vectors 来自 Row 对象。

    corpus = df.select(vectors).rdd.zipWithIndex().map(lambda x: [x[1], x[0]['vectors']]).cache()