代码之家  ›  专栏  ›  技术社区  ›  Alejandro Alcalde


  •  1
  • Alejandro Alcalde  · 技术社区  · 6 年前


       * Compute the probabilities of each value on the given [[DataSet]]
       * @param x single colum [[DataSet]]
       * @return Sequence of probabilites for each value
      private[this] def probs(x: DataSet[Double]): Seq[Double] = {
            val counts = x.groupBy(_.doubleValue)
              .name("X Probs")
            val total = counts.sum
            counts.map(_ / total)

    问题是,当我提交我的flink作业时,它使用了这个方法,它导致flink由于一个任务而终止作业 TimeOut . 我正在为 数据集


    试了几次后,我就成功了 mapPartition InformationTheory SymmetricalUncertainty 计算如下:

       * Computes 'symmetrical uncertainty' (SU) - a symmetric mutual information measure.
       * It is defined as SU(X, y) = 2 * (IG(X|Y) / (H(X) + H(Y)))
       * @param xy [[DataSet]] with two features
       * @return SU value
      def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {
        val su = xy.mapPartitionWith {
          case in ⇒
            val x = in map (_._2)
            val y = in map (_._1)
            val mu = mutualInformation(x, y)
            val Hx = entropy(x)
            val Hy = entropy(y)
            Some(2 * mu / (Hx + Hy))

    有了这个,我可以有效地计算 entropy 问题是,它只在并行度为1的情况下工作,问题在于 地图分割 .

    对称确定性 ,但以何种程度的平行性呢?

    1 回复  |  直到 6 年前
  •  0
  •   Alejandro Alcalde    6 年前


    def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {
        val su = xy.reduceGroup { in ⇒
            val invec = in.toVector
            val x = invec map (_._2)
            val y = invec map (_._1)
            val mu = mutualInformation(x, y)
            val Hx = entropy(x)
            val Hy = entropy(y)
            2 * mu / (Hx + Hy)

    你可以在 InformationTheory.scala ,及其测试 InformationTheorySpec.scala