代码之家  ›  专栏  ›  技术社区  ›  Alejandro Alcalde

优化Flink变换

  •  1
  • Alejandro Alcalde  · 技术社区  · 6 年前

    DataSet

    /**
       * Compute the probabilities of each value on the given [[DataSet]]
       *
       * @param x single colum [[DataSet]]
       * @return Sequence of probabilites for each value
       */
      private[this] def probs(x: DataSet[Double]): Seq[Double] = {
            val counts = x.groupBy(_.doubleValue)
              .reduceGroup(_.size.toDouble)
              .name("X Probs")
              .collect
    
            val total = counts.sum
    
            counts.map(_ / total)
      }
    

    问题是,当我提交我的flink作业时,它使用了这个方法,它导致flink由于一个任务而终止作业 TimeOut . 我正在为 数据集

    有没有办法让我的代码更有效?

    试了几次后,我就成功了 mapPartition InformationTheory SymmetricalUncertainty 计算如下:

    /**
       * Computes 'symmetrical uncertainty' (SU) - a symmetric mutual information measure.
       *
       * It is defined as SU(X, y) = 2 * (IG(X|Y) / (H(X) + H(Y)))
       *
       * @param xy [[DataSet]] with two features
       * @return SU value
       */
      def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {
        val su = xy.mapPartitionWith {
          case in ⇒
            val x = in map (_._2)
            val y = in map (_._1)
    
            val mu = mutualInformation(x, y)
            val Hx = entropy(x)
            val Hy = entropy(y)
    
            Some(2 * mu / (Hx + Hy))
        }
    
        su.collect.head.head
      }
    

    有了这个,我可以有效地计算 entropy 问题是,它只在并行度为1的情况下工作,问题在于 地图分割 .

    对称确定性 ,但以何种程度的平行性呢?

    1 回复  |  直到 6 年前
        1
  •  0
  •   Alejandro Alcalde    6 年前

    我终于做到了,不知道这是不是最好的解决方案,但它使用了n级并行:

    def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {
        val su = xy.reduceGroup { in ⇒
            val invec = in.toVector
            val x = invec map (_._2)
            val y = invec map (_._1)
    
            val mu = mutualInformation(x, y)
            val Hx = entropy(x)
            val Hy = entropy(y)
    
            2 * mu / (Hx + Hy)
        }
    
        su.collect.head
      } 
    

    你可以在 InformationTheory.scala ,及其测试 InformationTheorySpec.scala