代码之家  ›  专栏  ›  技术社区  ›  Arshanvit

spark for api调用中并行执行的spark时间

  •  0
  • Arshanvit  · 技术社区  · 6 年前

    我在我的8Gigs笔记本电脑里做这件事,在Intellij中运行代码。我正在调用3个API map 功能与 scalaj 库和计算调用每个API的时间如下:

    val urls = spark.sparkContext.parallelize(Seq("url1", "url2", "url3"))
    //for each API call,execute them in different executor and collate data 
    val actual_data = urls.map(x => spark.time(HTTPRequestParallel.ds(x))) 
    

    什么时候? spark.time 执行时,我期望有3组时间,但它给了我6组时间

    Time taken: 14945 ms
    Time taken: 21773 ms
    Time taken: 22446 ms
    Time taken: 6438 ms
    Time taken: 6877 ms
    Time taken: 7107 ms
    

    我在这里遗漏了什么,它实际上是对API的并行调用吗?

    1 回复  |  直到 6 年前
        1
  •  1
  •   Mikel San Vicente    6 年前

    事实上,单是这段代码就不会执行任何 spark.time 总之,map函数是懒惰的,因此在执行 RDD . 你也应该考虑,如果你不坚持你的转变 RDD 它将重新计算每个操作的所有转换。这意味着如果你在做这样的事情:

    val urls = spark.sparkContext.parallelize(Seq("url1", "url2", "url3"))
    //for each API call,execute them in different executor and collate data 
    val actual_data = urls.map(x => spark.time(HTTPRequestParallel.ds(x))) 
    val c = actual_data.count()
    actual_data.collect()
    

    将有6次执行 map (每个元素两个 RDD ,第一个 count 第二个是 collect )为了避免这种重新计算,可以缓存或保持 RDD 如下

    val urls = spark.sparkContext.parallelize(Seq("url1", "url2", "url3"))
    //for each API call,execute them in different executor and collate data 
    val actual_data = urls.map(x => spark.time(HTTPRequestParallel.ds(x))).cache() 
    val c = actual_data.count()
    actual_data.collect()
    

    在第二个示例中,您将只看到3个日志,而不是6个