有人知道Apache Spark SQL实现与标准SQL qualify()+rnk或row_number语句相同结果的最佳方法吗?
例如:
-
我有一个名为statement_data的Spark数据框架,每个数据框架有12个月记录,每个月记录有100个唯一的account_numbers,因此总共有1200条记录
-
每个月记录都有一个名为“statement_date”的字段,可用于确定最近的记录
我希望我的最终结果是一个新的Spark Dataframe,其中包含100个唯一account_numbers中的每一个的3个最新记录(由statement_date降序确定),因此总共有300个最终记录。
在标准Teradata SQL中,我可以执行以下操作:
select * from statement_data
qualify row_number ()
over(partition by acct_id order by statement_date desc) <= 3
据我所知,ApacheSparkSQL没有一个独立的限定函数,可能是语法错误,或者找不到限定存在的文档。
如果我需要分两个步骤来做,只要这两个步骤是:
-
为每个account_number的记录分配等级/行编号的选择查询或替代方法
-
一个选择查询,其中我选择了排名<=的所有记录3(即选择最近的第1、第2和第3条记录)。
编辑1-7/23 2:09下午:
zero323提供的初始解决方案在安装了Spark SQL 1.4.1依赖项的Spark 1.4.1中不适用。
编辑2-7/23下午3:24:
事实证明,错误与对查询使用SQLContext对象而不是HiveContext有关。添加以下代码以创建和使用配置单元上下文后,我现在能够正确运行以下解决方案:
final JavaSparkContext sc2;
final HiveContext hc2;
DataFrame df;
hc2 = TestHive$.MODULE$;
sc2 = new JavaSparkContext(hc2.sparkContext());
....
// Initial Spark/SQL contexts to set up Dataframes
SparkConf conf = new SparkConf().setAppName("Statement Test");
...
DataFrame stmtSummary =
hc2.sql("SELECT * FROM (SELECT acct_id, stmt_end_dt, stmt_curr_bal, row_number() over (partition by acct_id order by stmt_curr_bal DESC) rank_num FROM stmt_data) tmp WHERE rank_num <= 3");