代码之家  ›  专栏  ›  技术社区  ›  Decrayer

在Apache Spark数据集上应用flatMap操作时出现意外的编码器行为

  •  2
  • Decrayer  · 技术社区  · 6 年前

    我正在尝试将实际上包含双值的csv字符串转换为与spark ml兼容的数据集。由于我事先不知道需要多少功能,我决定使用一个助手类“Instance”,它已经包含了分类器要使用的正确数据类型,并且在其他一些情况下已经按照预期工作:

    public class Instance implements Serializable {
        /**
         * 
         */
        private static final long serialVersionUID = 6091606543088855593L;
        private Vector indexedFeatures;
        private double indexedLabel;
        ...getters and setters for both fields...
    }
    

    我得到意外行为的部分是:

        Encoder<Instance> encoder = Encoders.bean(Instance.class);
        System.out.println("encoder.schema()");
        encoder.schema().printTreeString();
        Dataset<Instance> dfInstance = df.select("value").as(Encoders.STRING())
                  .flatMap(s -> {
                    String[] splitted = s.split(",");
    
                    int length = splitted.length;
                    double[] features = new double[length-1];
                    for (int i=0; i<length-1; i++) {
                        features[i] = Double.parseDouble(splitted[i]);
                    }
    
                    if (length < 2) {
                        return Collections.emptyIterator();
                    } else {
                        return Collections.singleton(new Instance( 
                            Vectors.dense(features), 
                            Double.parseDouble(splitted[length-1])
                            )).iterator();
                    }
                  }, encoder);
    
        System.out.println("dfInstance");
        dfInstance.printSchema();
        dfInstance.show(5);
    

    我在控制台上得到以下输出:

    encoder.schema()
    root
     |-- indexedFeatures: vector (nullable = true)
     |-- indexedLabel: double (nullable = false)
    
    dfInstance
    root
     |-- indexedFeatures: struct (nullable = true)
     |-- indexedLabel: double (nullable = true)
    
    +---------------+------------+
    |indexedFeatures|indexedLabel|
    +---------------+------------+
    |             []|         0.0|
    |             []|         0.0|
    |             []|         1.0|
    |             []|         0.0|
    |             []|         1.0|
    +---------------+------------+
    only showing top 5 rows
    

    编码器架构将indexedFeatures行数据类型正确显示为向量。但是,当我应用编码器并进行转换时,它将给我一行类型为struct的数据,其中不包含真实的对象。

    我想理解,为什么Spark为我提供了结构类型而不是正确的向量类型。

    1 回复  |  直到 6 年前
        1
  •  1
  •   werner    6 年前

    实际上,我的答案并不能解释为什么会得到结构类型。但基于 previous question ,我可能可以提供一个解决方法。

    原始输入使用 DataFrameReader's csv function ,然后再次 VectorAssembler 使用时间:

    Dataset<Row> csv = spark.read().option("inferSchema", "true")
      .csv(inputDf.select("value").as(Encoders.STRING()));
    String[] fieldNames = csv.schema().fieldNames();    
    VectorAssembler assembler = new VectorAssembler().setInputCols(
      Arrays.copyOfRange(fieldNames, 0, fieldNames.length-1))
      .setOutputCol("indexedFeatures");
    Dataset<Row> result = assembler.transform(csv)
      .withColumn("indexedLabel", functions.col(fieldNames[fieldNames.length-1]))
      .select("indexedFeatures", "indexedLabel");