代码之家  ›  专栏  ›  技术社区  ›  Brian Correro

Quality+Row_number语句的SPARK SQL等效语句

  •  9
  • Brian Correro  · 技术社区  · 9 年前

    有人知道Apache Spark SQL实现与标准SQL qualify()+rnk或row_number语句相同结果的最佳方法吗?

    例如:

    • 我有一个名为statement_data的Spark数据框架,每个数据框架有12个月记录,每个月记录有100个唯一的account_numbers,因此总共有1200条记录
    • 每个月记录都有一个名为“statement_date”的字段,可用于确定最近的记录

    我希望我的最终结果是一个新的Spark Dataframe,其中包含100个唯一account_numbers中的每一个的3个最新记录(由statement_date降序确定),因此总共有300个最终记录。

    在标准Teradata SQL中,我可以执行以下操作:

    select * from statement_data
    qualify row_number ()
    over(partition by acct_id order by statement_date desc) <= 3
    

    据我所知,ApacheSparkSQL没有一个独立的限定函数,可能是语法错误,或者找不到限定存在的文档。

    如果我需要分两个步骤来做,只要这两个步骤是:

    • 为每个account_number的记录分配等级/行编号的选择查询或替代方法
    • 一个选择查询,其中我选择了排名<=的所有记录3(即选择最近的第1、第2和第3条记录)。

    编辑1-7/23 2:09下午: zero323提供的初始解决方案在安装了Spark SQL 1.4.1依赖项的Spark 1.4.1中不适用。

    编辑2-7/23下午3:24: 事实证明,错误与对查询使用SQLContext对象而不是HiveContext有关。添加以下代码以创建和使用配置单元上下文后,我现在能够正确运行以下解决方案:

    final JavaSparkContext sc2;
    final HiveContext hc2;
    DataFrame df;
    hc2 = TestHive$.MODULE$;
    sc2 = new JavaSparkContext(hc2.sparkContext()); 
    ....
    // Initial Spark/SQL contexts to set up Dataframes  
    SparkConf conf = new SparkConf().setAppName("Statement Test");
    ...
    DataFrame stmtSummary = 
        hc2.sql("SELECT * FROM (SELECT acct_id, stmt_end_dt, stmt_curr_bal, row_number() over (partition by acct_id order by stmt_curr_bal DESC) rank_num FROM stmt_data) tmp WHERE rank_num <= 3");
    
    1 回复  |  直到 7 年前
        1
  •  12
  •   Community CDub    7 年前

    没有 qualify (检查通常很有用 parser source )但是您可以像这样使用子查询:

    SELECT * FROM (
        SELECT *, row_number() OVER (
            PARTITION BY acct_id ORDER BY statement_date DESC
        ) rank FROM df
     ) tmp WHERE rank <= 3
    

    另请参见 SPARK : failure: ``union'' expected but `(' found