代码之家  ›  专栏  ›  技术社区  ›  Alejandro Alcalde

在Apache Flink中读取超过22列的csv

  •  0
  • Alejandro Alcalde  · 技术社区  · 6 年前

    到目前为止我一直在做的是如下的csv:

    val data = env.readCsvFile[ElecNormNew](getClass.getResource("/elecNormNew.arff").getPath)
    
    val dataSet = data map { tuple =>
          val list = tuple.productIterator.toList
          val numList = list map (_.asInstanceOf[Double])
          LabeledVector(numList(8), DenseVector(numList.take(8).toArray))
        }
    

    在哪里? ElecNorNew 是一个 case class :

    case class ElecNormNew(
      var date: Double,
      var day: Double,
      var period: Double,
      var nswprice: Double,
      var nswdemand: Double,
      var vicprice: Double,
      var vicdemand: Double,
      var transfer: Double,
      var label: Double) extends Serializable {
    }
    

    如规定 Flink's docs . 但现在我正试图阅读一个包含53列的csv。有没有一种方法可以自动化这个过程?是否需要创建具有53个字段的POJO?

    更新

    在法比安的回答之后,我正在尝试:

    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)
      val rowIF = new RowCsvInputFormat(new Path(getClass.getResource("/lungcancer.csv").getPath), fieldTypes)
      val csvData: DataSet[Row] = env.createInput[Row](rowIF)
      val dataSet2 = csvData.map { tuple =>
          ???
      }
    

    但不知道如何继续,我应该如何使用 RowTypeInfo ?

    1 回复  |  直到 6 年前
        1
  •  2
  •   Fabian Hueske    6 年前

    你可以使用 RowCsvInputFormat 如下:

    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)
    
    val rowIF = new RowCsvInputFormat(new Path("file:///myCsv"), fieldTypes)
    val csvData: DataSet[Row] = env.createInput[Row](rowIF)
    

    Row 将数据存储在 Array[Any] . 因此,Flink无法自动推断 . 这使得比类型化的元组或case类更难使用。你需要明确地提供 RowTypeInfo 使用正确的类型。这可以作为隐式值或通过扩展 ResultTypeQueryable 接口。