代码之家  ›  专栏  ›  技术社区  ›  Daniel Severo

嵌套数组中的PySpark反向StringIndexer

  •  6
  • Daniel Severo  · 技术社区  · 7 年前

    我正在使用PySpark使用ALS进行协作过滤。我的原始用户和项目id是字符串,所以我使用了 StringIndexer 将其转换为数字指数(Pypark的ALS模型要求我们这样做)。

    recs = (
        model
        .recommendForAllUsers(3)
    )
    

    这个 recs dataframe看起来是这样的:

    +-----------+--------------------+
    |userIdIndex|     recommendations|
    +-----------+--------------------+
    |       1580|[[10096,3.6725707...|
    |       4900|[[10096,3.0137873...|
    |       5300|[[10096,2.7274625...|
    |       6620|[[10096,2.4493625...|
    |       7240|[[10096,2.4928937...|
    +-----------+--------------------+
    only showing top 5 rows
    
    root
     |-- userIdIndex: integer (nullable = false)
     |-- recommendations: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- productIdIndex: integer (nullable = true)
     |    |    |-- rating: float (nullable = true)
    

    (
        recs
        .toJSON()
        .saveAsTextFile("name_i_must_hide.recs")
    )
    

    这些JSON的一个示例是:

    {
      "userIdIndex": 1580,
      "recommendations": [
        {
          "productIdIndex": 10096,
          "rating": 3.6725707
        },
        {
          "productIdIndex": 10141,
          "rating": 3.61542
        },
        {
          "productIdIndex": 11591,
          "rating": 3.536216
        }
      ]
    }
    

    这个 userIdIndex productIdIndex

    IndexToString 数据帧。

    Pipeline 评估者( stages=[StringIndexer, ALS, IndexToString]

    干杯

    2 回复  |  直到 7 年前
        1
  •  4
  •   zero323 little_kid_pea    7 年前

    在这两种情况下,您都需要访问标签列表。可以使用 StringIndexerModel

    user_indexer_model = ...  # type: StringIndexerModel
    user_labels = user_indexer_model.labels
    
    product_indexer_model = ...  # type: StringIndexerModel
    product_labels = product_indexer_model.labels
    

    或列元数据。

    userIdIndex 你可以直接申请 IndexToString

    from pyspark.ml.feature import IndexToString
    
    user_id_to_label = IndexToString(
        inputCol="userIdIndex", outputCol="userId", labels=user_labels)
    user_id_to_label.transform(recs)
    

    对于建议,您需要 udf 或者像这样的表达:

    from pyspark.sql.functions import array, col, lit, struct
    
    n = 3  # Same as numItems
    
    product_labels_ = array(*[lit(x) for x in product_labels])
    recommendations = array(*[struct(
        product_labels_[col("recommendations")[i]["productIdIndex"]].alias("productId"),
        col("recommendations")[i]["rating"].alias("rating")
    ) for i in range(n)])
    
    recs.withColumn("recommendations", recommendations)
    
        2
  •  0
  •   sajjad    2 年前

    您可以使用 IndexToString StringIndexer 针对用户和产品

    
    from pyspark.ml.feature import StringIndexer, IndexToString
    idx_to_user = IndexToString(inputCol='userIdIndex',outputCol='user_id').setLabels(self.user_indexer.labels)
    idx_to_prod = IndexToString(inputCol='productIdIndex',outputCol='product_id').setLabels(self.prod_indexer.labels)
    
    recoms = idx_to_user.transform(recs)
    res = self.idx_to_prod.transform(recoms.select(F.col('user_id'),F.explode('recommendations')).select('user_id','col.productIdIndex','col.rating'))
    result = res.select('user_id','product_id','rating')