DataSet
/**
* Compute the probabilities of each value on the given [[DataSet]]
*
* @param x single colum [[DataSet]]
* @return Sequence of probabilites for each value
*/
private[this] def probs(x: DataSet[Double]): Seq[Double] = {
val counts = x.groupBy(_.doubleValue)
.reduceGroup(_.size.toDouble)
.name("X Probs")
.collect
val total = counts.sum
counts.map(_ / total)
}
问题是,当我提交我的flink作业时,它使用了这个方法,它导致flink由于一个任务而终止作业
TimeOut
. 我正在为
数据集
有没有办法让我的代码更有效?
试了几次后,我就成功了
mapPartition
InformationTheory
SymmetricalUncertainty
计算如下:
/**
* Computes 'symmetrical uncertainty' (SU) - a symmetric mutual information measure.
*
* It is defined as SU(X, y) = 2 * (IG(X|Y) / (H(X) + H(Y)))
*
* @param xy [[DataSet]] with two features
* @return SU value
*/
def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {
val su = xy.mapPartitionWith {
case in â
val x = in map (_._2)
val y = in map (_._1)
val mu = mutualInformation(x, y)
val Hx = entropy(x)
val Hy = entropy(y)
Some(2 * mu / (Hx + Hy))
}
su.collect.head.head
}
有了这个,我可以有效地计算
entropy
问题是,它只在并行度为1的情况下工作,问题在于
地图分割
.
对称确定性
,但以何种程度的平行性呢?