到目前为止我一直在做的是如下的csv:
val data = env.readCsvFile[ElecNormNew](getClass.getResource("/elecNormNew.arff").getPath)
val dataSet = data map { tuple =>
val list = tuple.productIterator.toList
val numList = list map (_.asInstanceOf[Double])
LabeledVector(numList(8), DenseVector(numList.take(8).toArray))
}
在哪里?
ElecNorNew
是一个
case class
:
case class ElecNormNew(
var date: Double,
var day: Double,
var period: Double,
var nswprice: Double,
var nswdemand: Double,
var vicprice: Double,
var vicdemand: Double,
var transfer: Double,
var label: Double) extends Serializable {
}
如规定
Flink's docs
. 但现在我正试图阅读一个包含53列的csv。有没有一种方法可以自动化这个过程?是否需要创建具有53个字段的POJO?
更新
在法比安的回答之后,我正在尝试:
val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)
val rowIF = new RowCsvInputFormat(new Path(getClass.getResource("/lungcancer.csv").getPath), fieldTypes)
val csvData: DataSet[Row] = env.createInput[Row](rowIF)
val dataSet2 = csvData.map { tuple =>
???
}
但不知道如何继续,我应该如何使用
RowTypeInfo
?