代码之家  ›  专栏  ›  技术社区  ›  rupert160

在没有本地迭代器的情况下,如何返回spark rdd分区值?

  •  0
  • rupert160  · 技术社区  · 6 年前

    我正在学习Spark及其与RDD分区分布相关的并行性。我有一台4 CPU的机器,因此我有4个并行单元。要返回分区索引“0”的成员,在不强制RDD使用本地化器的情况下,我找不到返回此分区的方法。

    我习惯于保持简洁。是否有更简洁的方法按分区筛选RDD?以下两种方法是可行的,但似乎很笨拙。

    scala> val data = 1 to 20
    data: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
    
    scala> val distData = sc.parallelize(data)
    distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[75] at parallelize at <console>:26
    
    scala> distData.mapPartitionsWithIndex{
       (index,it) => {
          it.toList.map(x => if (index == 0) (x)).iterator
       }
    }.toLocalIterator.toList.filterNot(
       _.isInstanceOf[Unit]
    )
    res107: List[AnyVal] = List(1, 2, 3, 4, 5)
    
    scala> distData.mapPartitionsWithIndex{
       (index,it) => {
          it.toList.map(x => if (index == 0) (x)).iterator
       }
    }.toLocalIterator.toList.filter(
       _ match{
          case x: Unit => false
          case x => true
       }
    )
    res108: List[AnyVal] = List(1, 2, 3, 4, 5)
    
    1 回复  |  直到 6 年前
        1
  •  1
  •   philantrovert    6 年前
    distData.mapPartitionsWithIndex{ (index, it) => 
          if (index == 0) it else Array[Int]().iterator
    }
    

    您可以返回一个空的迭代器,它将正常工作。