代码之家  ›  专栏  ›  技术社区  ›  Etibu

使用谓词下推连接两个数据集

  •  4
  • Etibu  · 技术社区  · 7 年前

    我有一个 我从一个 数据集 凤凰台

    val dfToJoin = sparkSession.createDataset(rddToJoin)
    val tableDf = sparkSession
      .read
      .option("table", "table")
      .option("zkURL", "localhost")
      .format("org.apache.phoenix.spark")
      .load()
    val joinedDf = dfToJoin.join(tableDf, "columnToJoinOn")
    

    当我执行它时,似乎加载了整个数据库表来进行连接。

    也: 比桌子小,我不知道这是否重要。

    编辑:基本上我想用通过spark创建的数据集连接我的Phoenix表,而不需要将整个表提取到executor中。

    *Project [FEATURE#21, SEQUENCE_IDENTIFIER#22, TAX_NUMBER#23, 
             WINDOW_NUMBER#24, uniqueIdentifier#5, readLength#6]
     +- *SortMergeJoin [FEATURE#21], [feature#4], Inner
         :- *Sort [FEATURE#21 ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(FEATURE#21, 200)
         :     +- *Filter isnotnull(FEATURE#21)
         :        +- *Scan PhoenixRelation(FEATURES,localhost,false) 
    
        [FEATURE#21,SEQUENCE_IDENTIFIER#22,TAX_NUMBER#23,WINDOW_NUMBER#24] 
        PushedFilters: [IsNotNull(FEATURE)], ReadSchema: 
    
        struct<FEATURE:int,SEQUENCE_IDENTIFIER:string,TAX_NUMBER:int,
        WINDOW_NUMBER:int>
       +- *Sort [feature#4 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(feature#4, 200)
         +- *Filter isnotnull(feature#4)
            +- *SerializeFromObject [assertnotnull(input[0, utils.CaseClasses$QueryFeature, true], top level Product input object).feature AS feature#4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, utils.CaseClasses$QueryFeature, true], top level Product input object).uniqueIdentifier, true) AS uniqueIdentifier#5, assertnotnull(input[0, utils.CaseClasses$QueryFeature, true], top level Product input object).readLength AS readLength#6]
               +- Scan ExternalRDDScan[obj#3]
    

    正如您所看到的,equals过滤器不包含在pushed filters列表中,因此很明显没有发生谓词下推。

    1 回复  |  直到 7 年前
        1
  •  7
  •   mrsrinivas    7 年前

    不是一个执行者的整个表格 )

    因为不是直接的 filter 在凤凰台df上,我们只看到 *Filter isnotnull(FEATURE#21)


    正如您所提到的,当您对Phoenix表应用过滤器时,它的数据更少。你把过滤器推到phoenix表上 feature 按查找列 feature_ids

    //This spread across workers  - fully distributed
    val dfToJoin = sparkSession.createDataset(rddToJoin)
    
    //This sits in driver - not distributed
    val list_of_feature_ids = dfToJoin.dropDuplicates("feature")
      .select("feature")
      .map(r => r.getString(0))
      .collect
      .toList
    
    //This spread across workers  - fully distributed
    val tableDf = sparkSession
      .read
      .option("table", "table")
      .option("zkURL", "localhost")
      .format("org.apache.phoenix.spark")
      .load()
      .filter($"FEATURE".isin(list_of_feature_ids:_*)) //added filter
    
    //This spread across workers  - fully distributed
    val joinedDf = dfToJoin.join(tableDf, "columnToJoinOn")
    
    joinedDf.explain()