代码之家  ›  专栏  ›  技术社区  ›  Marc Le Bihan

我知道如何对数据集执行orderBy(“a”、“b”…),即groupBy。需要对每一个子集进行计算

  •  0
  • Marc Le Bihan  · 技术社区  · 4 年前

    我在看一份城市会计档案。我的目标是为每个机构的每个会计编号提供一些信息丰富的小计:

    阵风3 , 积云3 )到( 积雨d7 积雨云7 )添加到记录中,并聚合 解决方案 soldeCrediteur公司 13248 将聚集在 1324 132 例如,级别。

    +--------------------------+----------+-----------------+---------------------+---------------------+---------+----------+------------+-----------+------------+----------+---------------------+-----------+------------+------------------+-------------------+------------------------+-------------------------+---------------------------+----------------------------+-----------------------------+------------------------------+-------------+--------------+-------------+---------------+--------------------------+--------+--------+-----------------------------------------------------------------------------------------------------+-------------------------+------------+----------------+----------------+----------+----------+----------------+----------+----------+----------------+----------+---------+---------------+-----------+--------------+----------------+--------+---------+
    |libelleBudget             |typeBudget|typeEtablissement|sousTypeEtablissement|nomenclatureComptable|siren    |codeRegion|codeActivite|codeSecteur|numeroFINESS|codeBudget|categorieCollectivite|typeBalance|numeroCompte|balanceEntreeDebit|balanceEntreeCredit|operationBudgetaireDebit|operationBudgetaireCredit|operationNonBudgetaireDebit|operationNonBudgetaireCredit|operationOrdreBudgetaireDebit|operationOrdreBudgetaireCredit|soldeDebiteur|soldeCrediteur|anneeExercice|budgetPrincipal|nombreChiffresNumeroCompte|cumulSD7|cumulSC7|libelleCompte                                                                                        |nomenclatureComptablePlan|sirenCommune|populationTotale|numeroCompteSur3|cumulSD3  |cumulSC3  |numeroCompteSur4|cumulSD4  |cumulSC4  |numeroCompteSur5|cumulSD5  |cumulSC5 |codeDepartement|codeCommune|siret         |numeroCompteSur6|cumulSD6|cumulSC6 |
    +--------------------------+----------+-----------------+---------------------+---------------------+---------+----------+------------+-----------+------------+----------+---------------------+-----------+------------+------------------+-------------------+------------------------+-------------------------+---------------------------+----------------------------+-----------------------------+------------------------------+-------------+--------------+-------------+---------------+--------------------------+--------+--------+-----------------------------------------------------------------------------------------------------+-------------------------+------------+----------------+----------------+----------+----------+----------------+----------+----------+----------------+----------+---------+---------------+-----------+--------------+----------------+--------+---------+
    |ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |1021        |0.0               |349139.71          |0.0                     |0.0                      |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |349139.71     |2019         |true           |4                         |0.0     |0.0     |Dotation                                                                                             |M14                      |210100012   |794             |102             |0.0       |995427.19 |1021            |0.0       |349139.71 |1021            |0.0       |0.0      |01             |01001      |21010001200017|1021            |0.0     |0.0      |
    |ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |10222       |0.0               |554545.85          |0.0                     |30003.0                  |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |584548.85     |2019         |true           |5                         |0.0     |0.0     |F.C.T.V.A.                                                                                           |M14                      |210100012   |794             |102             |0.0       |995427.19 |1022            |0.0       |646287.48 |10222           |0.0       |584548.85|01             |01001      |21010001200017|10222           |0.0     |0.0      |
    |ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |10223       |0.0               |4946.0             |0.0                     |0.0                      |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |4946.0        |2019         |true           |5                         |0.0     |0.0     |T.L.E.                                                                                               |M14                      |210100012   |794             |102             |0.0       |995427.19 |1022            |0.0       |646287.48 |10223           |0.0       |4946.0   |01             |01001      |21010001200017|10223           |0.0     |0.0      |
    |ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |10226       |0.0               |41753.65           |0.0                     |12078.54                 |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |53832.19      |2019         |true           |5                         |0.0     |0.0     |Taxe d’aménagement                                                                                   |M14                      |210100012   |794             |102             |0.0       |995427.19 |1022            |0.0       |646287.48 |10226           |0.0       |53832.19 |01             |01001      |21010001200017|10226           |0.0     |0.0      |
    |ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |10227       |0.0               |2960.44            |0.0                     |0.0                      |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |2960.44       |2019         |true           |5                         |0.0     |0.0     |Versement pour sous-densité                                                                          |M14                      |210100012   |794             |102             |0.0       |995427.19 |1022            |0.0       |646287.48 |10227           |0.0       |2960.44  |01             |01001      |21010001200017|10227           |0.0     |0.0      |
    |ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |1068        |0.0               |2281475.34         |0.0                     |0.0                      |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |2281475.34    |2019         |true           |4                         |0.0     |0.0     |Excédents de fonctionnement capitalisés                                                              |M14                      |210100012   |794             |106             |0.0       |2281475.34|1068            |0.0       |2281475.34|1068            |0.0       |0.0      |01             |01001      |21010001200017|1068            |0.0     |0.0      |
    |ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |110         |0.0               |97772.73           |0.0                     |0.0                      |0.0                        |112620.66                   |0.0                          |0.0                           |0.0          |210393.39     |2019         |true           |3                         |0.0     |0.0     |Report à nouveau (solde créditeur)                                                                   |M14                      |210100012   |794             |110             |0.0       |210393.39 |110             |0.0       |0.0       |110             |0.0       |0.0      |01             |01001      |21010001200017|110             |0.0     |0.0      |
    |ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |12          |0.0               |112620.66          |0.0                     |0.0                      |112620.66                  |0.0                         |0.0                          |0.0                           |0.0          |0.0           |2019         |true           |2                         |0.0     |0.0     |RÉSULTAT DE L'EXERCICE (excédentaire ou déficitaire)                                                 |M14                      |210100012   |794             |12              |0.0       |0.0       |12              |0.0       |0.0       |12              |0.0       |0.0      |01             |01001      |21010001200017|12              |0.0     |0.0      |
    |ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |1321        |0.0               |29097.78           |0.0                     |0.0                      |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |29097.78      |2019         |true           |4                         |0.0     |0.0     |État et établissements nationaux                                                                     |M14                      |210100012   |794             |132             |0.0       |296722.26 |1321            |0.0       |29097.78  |1321            |0.0       |0.0      |01             |01001      |21010001200017|1321            |0.0     |0.0      |
    |ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |1322        |0.0               |201.67             |0.0                     |0.0                      |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |201.67        |2019         |true           |4                         |0.0     |0.0     |Régions                                                                                              |M14                      |210100012   |794             |132             |0.0       |296722.26 |1322            |0.0       |201.67    |1322            |0.0       |0.0      |01             |01001      |21010001200017|1322            |0.0     |0.0      |
    |ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |1323        |0.0               |163194.37          |0.0                     |0.0                      |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |163194.37     |2019         |true           |4                         |0.0     |0.0     |Départements                                                                                         |M14                      |210100012   |794             |132             |0.0       |296722.26 |1323            |0.0       |163194.37 |1323            |0.0       |0.0      |01             |01001      |21010001200017|1323            |0.0     |0.0      |
    |ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |13248       |0.0               |1129.37            |0.0                     |0.0                      |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |1129.37       |2019         |true           |5                         |0.0     |0.0     |Autres communes                                                                                      |M14                      |210100012   |794             |132             |0.0       |296722.26 |1324            |0.0       |1129.37   |13248           |0.0       |1129.37  |01             |01001      |21010001200017|13248           |0.0     |0.0      |
    |ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |13251       |0.0               |47079.11           |0.0                     |2387.05                  |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |49466.16      |2019         |true           |5                         |0.0     |0.0     |GFP de rattachement                                                                                  |M14                      |210100012   |794             |132             |0.0       |296722.26 |1325            |0.0       |49532.16  |13251           |0.0       |49466.16 |01             |01001      |21010001200017|13251           |0.0     |0.0      |
    |ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |13258       |0.0               |66.0               |0.0                     |0.0                      |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |66.0          |2019         |true           |5                         |0.0     |0.0     |Autres groupements                                                                                   |M14                      |210100012   |794             |132             |0.0       |296722.26 |1325            |0.0       |49532.16  |13258           |0.0       |66.0     |01             |01001      |21010001200017|13258           |0.0     |0.0      |
    

    +--------------+------------+-------------+--------------+--------+--------+--------+--------+---------+---------+----------+----------+----------+----------+
    |         siret|numeroCompte|soldeDebiteur|soldeCrediteur|cumulSD7|cumulSC7|cumulSD6|cumulSC6| cumulSD5| cumulSC5|  cumulSD4|  cumulSC4|  cumulSD3|  cumulSC3|
    +--------------+------------+-------------+--------------+--------+--------+--------+--------+---------+---------+----------+----------+----------+----------+
    |21010001200017|        1021|          0.0|     349139.71|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0| 349139.71|       0.0| 995427.19|
    |21010001200017|       10222|          0.0|     584548.85|     0.0|     0.0|     0.0|     0.0|      0.0|584548.85|       0.0| 646287.48|       0.0| 995427.19|
    |21010001200017|       10223|          0.0|        4946.0|     0.0|     0.0|     0.0|     0.0|      0.0|   4946.0|       0.0| 646287.48|       0.0| 995427.19|
    |21010001200017|       10226|          0.0|      53832.19|     0.0|     0.0|     0.0|     0.0|      0.0| 53832.19|       0.0| 646287.48|       0.0| 995427.19|
    |21010001200017|       10227|          0.0|       2960.44|     0.0|     0.0|     0.0|     0.0|      0.0|  2960.44|       0.0| 646287.48|       0.0| 995427.19|
    |21010001200017|        1068|          0.0|    2281475.34|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0|2281475.34|       0.0|2281475.34|
    |21010001200017|         110|          0.0|     210393.39|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0|       0.0|       0.0| 210393.39|
    |21010001200017|          12|          0.0|           0.0|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0|       0.0|       0.0|       0.0|
    |21010001200017|        1321|          0.0|      29097.78|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0|  29097.78|       0.0| 296722.26|
    |21010001200017|        1322|          0.0|        201.67|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0|    201.67|       0.0| 296722.26|
    |21010001200017|        1323|          0.0|     163194.37|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0| 163194.37|       0.0| 296722.26|
    |21010001200017|       13248|          0.0|       1129.37|     0.0|     0.0|     0.0|     0.0|      0.0|  1129.37|       0.0|   1129.37|       0.0| 296722.26|
    |21010001200017|       13251|          0.0|      49466.16|     0.0|     0.0|     0.0|     0.0|      0.0| 49466.16|       0.0|  49532.16|       0.0| 296722.26|
    |21010001200017|       13258|          0.0|          66.0|     0.0|     0.0|     0.0|     0.0|      0.0|     66.0|       0.0|  49532.16|       0.0| 296722.26|
    |21010001200017|        1328|          0.0|      53566.91|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0|  53566.91|       0.0| 296722.26|
    |21010001200017|        1341|          0.0|     142734.21|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0| 142734.21|       0.0| 145233.21|
    |21010001200017|        1342|          0.0|        2499.0|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0|    2499.0|       0.0| 145233.21|
    |21010001200017|        1383|          0.0|       2550.01|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0|   2550.01|       0.0|   2550.01|
    |21010001200017|        1641|          0.0|     236052.94|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0| 236052.94|       0.0| 236052.94|
    

    这从会计文件开始排序 , , 汽笛 (我们的机构标识)。
    然而,由于缺乏知识,我做了一些让人心碎的事情:

    第一次尝试是昂贵的,但有效的,通过RDD

    /**
     * Créer un dataset de cumuls de comptes parents par siret.
     * @param session Session Spark.
     * @param comptes Dataset des comptes de comptabilités de tous les siret.
     * @return Dataset avec un siret associés à des cumuls par comptes à 7, 6, 5, 4, 3 chiffres, pour soldes de débit et soldes de crédit.
     */
    private Dataset<Row> cumulsComptesParentsParSiret(SparkSession session, Dataset<Row> comptes) {
       JavaPairRDD<String, Iterable<Row>> rddComptesParSiret = comptes.javaRDD().groupBy((Function<Row, String>)compte -> compte.getAs("siret"));
       
       // Réaliser les cumuls par siret et compte, par compte parent.
       JavaRDD<Row> rdd = rddComptesParSiret.flatMap((FlatMapFunction<Tuple2<String, Iterable<Row>>, Row>)comptesSiret -> {
          String siret = comptesSiret._1();
          AccumulateurCompte comptesParentsPourSiret = new AccumulateurCompte(siret);
    
          for(Row rowCompte : comptesSiret._2()) {
             String numeroCompte = rowCompte.getAs("numeroCompte");
             Double soldeSD = rowCompte.getAs("soldeDebiteur");
             Double soldeSC = rowCompte.getAs("soldeCrediteur");
             
             comptesParentsPourSiret.add(numeroCompte, soldeSD, soldeSC);
          }
    
          // Faire une ligne de regroupement siret, compte et ses comptes parents.
          List<Row> rowsCumulsPourSiret = new ArrayList<>();
          
          for(Row rowCompte : comptesSiret._2()) {
             String numeroCompte = rowCompte.getAs("numeroCompte");
             double sd[] = new double[6]; 
             double sc[] = new double[6]; 
             
             for(int nombreChiffres = numeroCompte.length(); nombreChiffres >= 3; nombreChiffres--) {
                String compteParent = numeroCompte.substring(0, nombreChiffres);
                Double cumulDebits = comptesParentsPourSiret.getCumulSD(compteParent);
                Double cumulCredits = comptesParentsPourSiret.getCumulSC(compteParent);
                
                sd[nombreChiffres - 3] = cumulDebits != null ? Precision.round(cumulDebits, 2, BigDecimal.ROUND_CEILING) : 0.0;
                sc[nombreChiffres - 3] = cumulCredits != null ? Precision.round(cumulCredits, 2, BigDecimal.ROUND_CEILING) : 0.0;
             }
             
             Row rowCumulsPourCompte = RowFactory.create(siret, numeroCompte, sd[4], sc[4], sd[3], sc[3], sd[2], sc[2], sd[1], sc[1], sd[0], sc[0]);
             rowsCumulsPourSiret.add(rowCumulsPourCompte);
          }
          
          return rowsCumulsPourSiret.iterator();
       });
       
       return session.createDataFrame(rdd, schemaCumulComptesParents());
    }
    

    /**
     * Cumuler les sous-comptes.
     * @param comptes Dataset de comptes.
     * @return Dataset aux cumuls de comptes à 3, 4, 5, 6, 7 chiffres réalisés, par commune.
     */
    private Dataset<Row> cumulsSousComptes(Dataset<Row> comptes) {
       Dataset<Row> comptesAvecCumuls = comptes; 
       
       for(int nombreChiffresNiveauCompte = 3; nombreChiffresNiveauCompte < 7; nombreChiffresNiveauCompte ++) {
          comptesAvecCumuls = cumulsCompteParent(comptesAvecCumuls, nombreChiffresNiveauCompte);
       }
       
       return comptesAvecCumuls;
    }
    
    /**
     * Cumul par un niveau de compte parent.
     * @param comptes Liste des comptes.
     * @param nombreChiffres Nombre de chiffres auquel réduire le compte à cummuler. Exemple 4 : 2041582 est cumulé sur 2041. 
     * @return cumuls par compte parent : dataset au format (cumul des soldes débiteurs, cumul des soldes créditeurs).
     */
    private Dataset<Row> cumulsCompteParent(Dataset<Row> comptes, int nombreChiffres) {
       // Cumuler pour un niveau de compte parent sur le préfixe de leurs comptes réduits à nombreChiffres.
       Column nombreChiffresCompte = comptes.col("nombreChiffresNumeroCompte");
       
       String aliasNumeroCompte = MessageFormat.format("numeroCompteSur{0}", nombreChiffres);
       RelationalGroupedDataset group = comptes.groupBy(col("codeDepartement"), col("codeCommune"), col("siret"), col("numeroCompte").substr(1,nombreChiffres).as(aliasNumeroCompte));
       
       String nomChampCumulSD = MessageFormat.format("cumulSD{0}", nombreChiffres);
       String nomChampCumulSC = MessageFormat.format("cumulSC{0}", nombreChiffres);
       Column sd = sum(when(nombreChiffresCompte.$greater$eq(lit(nombreChiffres)), col("soldeDebiteur")).otherwise(lit(0.0))).as(nomChampCumulSD);
       Column sc = sum(when(nombreChiffresCompte.$greater$eq(lit(nombreChiffres)), col("soldeCrediteur")).otherwise(lit(0.0))).as(nomChampCumulSC);
    
       Dataset<Row> cumuls = group.agg(sd, sc);
       
       // Associer à chaque compte la colonne de cumuls de comptes parents, pour le niveau en question.
       Column jointure =  
          comptes.col("codeDepartement").equalTo(cumuls.col("codeDepartement"))
          .and(comptes.col("codeCommune").equalTo(cumuls.col("codeCommune")))
          .and(comptes.col("siret").equalTo(cumuls.col("siret")))
          .and(comptes.col("numeroCompte").substr(1, nombreChiffres).equalTo(cumuls.col(aliasNumeroCompte)));
    
       Dataset<Row> comptesAvecCumuls = comptes.join(cumuls, jointure, "left_outer")
          .drop(comptes.col("siret"))
          .drop(comptes.col("codeDepartement"))
          .drop(comptes.col("codeCommune"))
          .drop(comptes.col(nomChampCumulSD))
          .drop(comptes.col(nomChampCumulSC))
          .withColumnRenamed("cumulSD", nomChampCumulSD)
          .withColumnRenamed("cumulSC", nomChampCumulSC)
          .withColumn(nomChampCumulSD, round(col(nomChampCumulSD), 2))
          .withColumn(nomChampCumulSC, round(col(nomChampCumulSC), 2));
       
       return comptesAvecCumuls;
    }
    

    我所说的低级管理是指:一些最后一分钟的验证,以发出一些警告,或在求和时排除某些值:

    • (一)会计名称变更的。
    • 就我所掌握的其他知识而言,警告一个似乎奇怪的值。

    我想要什么

    我需要单独浏览每个组的行内容。一组接一组。

    火花

    • 第一个参数将给出当前的键值(对于 , 城市代码 , 汽笛
    • 第二个,与这些键相关联的行。
    Dataset<Row> eachGroupContent(Row keys, Dataset<Row> groupContent);
    

    火花

    Row (keys) : {Department : 01, City code : 01001, siret : 21010001200017}
    
    Dataset<Row> (values) associated :
    +---------------+-----------+--------------+------------+-------------+--------------+--------+
    |codeDepartement|codeCommune|         siret|numeroCompte|soldeDebiteur|soldeCrediteur|(others)|
    +---------------+-----------+--------------+------------+-------------+--------------+--------+
    |             01|      01001|21010001200017|        1021|          0.0|     349139.71|     ...|
    |             01|      01001|21010001200017|       10222|          0.0|     584548.85|     ...|
    |             01|      01001|21010001200017|       10223|          0.0|        4946.0|     ...|
    |             01|      01001|21010001200017|       10226|          0.0|      53832.19|     ...|
    
    Row : {Department : 01, City code : 01001, siret : 21010001200033}
    
    Dataset<Row> :
    |             01|      01001|21010001200033|        1021|          0.0|      38863.22|     ...|
    |             01|      01001|21010001200033|       10222|          0.0|       62067.0|     ...|
    |             01|      01001|21010001200033|       10228|          0.0|        9666.0|     ...|
    |             01|      01001|21010001200033|        1068|          0.0|     100121.62|     ...|
    
    Row : {Department : 01, City code : 01001, siret : 21010001200066}
    
    Dataset<Row> :
    |             01|      01001|21010001200066|        1641|          0.0|      100000.0|     ...|
    |             01|      01001|21010001200066|        3355|    587689.33|           0.0|     ...|
    |             01|      01001|21010001200066|        4011|          0.0|           0.0|     ...|
    |             01|      01001|21010001200066|       40171|          0.0|       10036.5|     ...|
    

    这是我第一次尝试能做到的,

    rddComptesParSiret.flatMap((FlatMapFunction<Tuple2<String, Iterable<Row>>, Row>)comptesSiret
    

    但没有提供所有的好钥匙(部门和城市代码丢失,打破了之前的所有分类),而且: RDD 不再赞成。

    但我没能在 Java RelationalGroupedDataset 似乎不提供此类工具的方法。

    目前,我知道如何进行groupBy或sort,即:

    accounting.groupBy("department", "cityCode", "accountNumber", "siret").agg(...);
    

    我的问题

    如何浏览
    每个记录 属于

    [执行子计算或其他工作]
    一组接一组

    0 回复  |  直到 4 年前
        1
  •  1
  •   werner    4 年前

    KeyValueGroupedDataset.mapGroups 将为给定组的所有行提供迭代器。实现接口时 MapGroupsFunction 您可以在整个组中访问这个迭代器。

    Dataset<Row> df = spark.read().option("header", true).option("inferSchema", true).csv(...);
    
    Dataset<Result> resultDf = df
        .groupByKey((MapFunction<Row, Key>) (Row r)
                      -> new Key(r.getInt(r.fieldIndex("codeDepartement")),
                                 r.getInt(r.fieldIndex("codeCommune")),
                                 r.getLong(r.fieldIndex("siret"))),
                      Encoders.bean(Key.class))
        .mapGroups(new MyMapGroupsFunction(), Encoders.bean(Result.class));
    resultDf.show();
    

    一个用于分组列:

    public static class Key {
        private int codeDepartement;
        private int codeCommune;
        private long siret;
        //constructors, getters and setters
        ...
    }
    

    一个用于结果列:

    public static class Result {
        private int codeDepartement;
        private int codeCommune;
        private long siret;
        private double result1;
        private double result2;
        //constructors, getters and setters
        ...
    }
    

    在本例中,我使用了一个由三个键列和两个计算列组成的结果结构 result1 result2 . 可以在这里添加更多的结果列。

    实际的逻辑发生在 MyMapGroupsFunction :

    public static class MyMapGroupsFunction implements MapGroupsFunction<Key, Row, Result> {
    
        @Override
        public Result call(Key key, Iterator<Row> values) throws Exception {
            //drain the iterator into a list. The list now
            //contains all rows that belong to one single group
            List<Row> rows = new ArrayList<>();
            values.forEachRemaining(rows::add);
    
            //now any arbitrary logic can be used to calculate the result values 
            //based on the contents of the list
            double result1 = 0;
            double result2 = 0;
            for (Row r : rows) {
                double cumulSD3 = r.getDouble(r.fieldIndex("cumulSC3"));
                double cumulSD4 = r.getDouble(r.fieldIndex("cumulSC4"));
                result1 += cumulSD3 + cumulSD4;
                result2 += cumulSD3 * cumulSD4;
            }
    
            //return the result consisting of the elements of the key and the calculated values
            return new Result(key.getCodeDepartement(), key.getCodeCommune(),
                    key.getSiret(), result1, result2);
        }
    }
    

    打印我们得到的结果

    +-----------+---------------+--------------------+--------------------+--------------+
    |codeCommune|codeDepartement|             result1|             result2|         siret|
    +-----------+---------------+--------------------+--------------------+--------------+
    |       1001|              1|   692508.8400000001|2.939458891576320...|21010001200019|
    |       1001|              1|1.4411536300000003E7|8.198151013048245E12|21010001200017|
    |       1001|              1|   692508.8400000001|2.939458891576320...|21010001200018|
    +-----------+---------------+--------------------+--------------------+--------------+
    

        2
  •  1
  •   halfer    4 年前

    我在这里添加一个答案来说明appart您的解决方案对我的代码的影响。

    /**
     * Clef de l'établissement dans la ville.
     */
    static class ClefEtablissement {
       /** Code département. */
       private String codeDepartement;
       
       /** Code commune. */
       private String codeCommune;
       
       /** Numéro siret. */
       private String siret;
       
       /**
        * Construire la clef de l'établissement.
        * @param codeDepartement Code département.
        * @param codeCommune Code commune.
        * @param siret Numéro siret.
        */
       ClefEtablissement(String codeDepartement, String codeCommune, String siret) {
          this.setCodeDepartement(codeDepartement);
          this.setCodeCommune(codeCommune);
          this.setSiret(siret);
       }
    
       /**
        * Renvoyer le code du département.
        * @return Code du département.
        */
       public String getCodeDepartement() {
          return this.codeDepartement;
       }
    
       /**
        * Fixer le code du département.
        * @param codeDepartement Code du département. 
        */
       public void setCodeDepartement(String codeDepartement) {
          this.codeDepartement = codeDepartement;
       }
    
       /**
        * Renvoyer le code de la commune.
        * @return Code de la commune.
        */
       public String getCodeCommune() {
          return this.codeCommune;
       }
    
       /**
        * Fixer le code de la commune.
        * @param codeCommune Code de la commune.
        */
       public void setCodeCommune(String codeCommune) {
          this.codeCommune = codeCommune;
       }
    
       /**
        * Renvoyer le numéro SIRET.
        * @return Siret.
        */
       public String getSiret() {
          return this.siret;
       }
    
       /**
        * Fixer le numéro SIRET.
        * @param siret SIRET.
        */
       public void setSiret(String siret) {
          this.siret = siret;
       }
    }
    

    你提供的解决方案 mapGroups(...) 这是最常见的用法。
    因此需要 n 帐户来自 e Dataset<Result> 属于 e 排。每个机构一个 Result call(Key key, Iterator<Row> values) 每次返回一个 Result .

    Dataset<Row> 它仍然 n

    /**
     * Cumuler les comptes racines sur chaque ligne.
     */
    @SuppressWarnings("rawtypes")
    public static class CumulComptesRacinesGroupFunction implements MapGroupsFunction<ClefEtablissement, Row, ArrayList> {
       /** Serial ID. */
       private static final long serialVersionUID = -7519513974536696466L;
    
       /**
        * Cumuler les comptes racines sur chaque ligne d'un groupe.
        */
       @Override
       public ArrayList call(ClefEtablissement etablissement, Iterator<Row> values) throws Exception {
          List<Row> comptes = new ArrayList<>();
          values.forEachRemaining(comptes::add);
          
          ArrayList<Row> cumulsRow = new ArrayList<>();
          Map<String, Double> cumulsSoldesDebits = new HashMap<>();
          Map<String, Double> cumulsSoldesCredits = new HashMap<>();
          
          // Pour chaque compte, cumuler son solde dans comptes racines à n chiffres (qu'il a), n-1, n-2, n-3 ... 3 chiffres. 
          comptes.forEach(compte -> {
             String numeroCompte = compte.getAs("numeroCompte");
    
             for(int nombreChiffres = numeroCompte.length(); nombreChiffres >= 3; nombreChiffres--) {
                String compteParent = numeroCompte.substring(0, nombreChiffres);
             
                Double soldeDebit = compte.getAs("soldeDebiteur");
                Double soldeCredit = compte.getAs("soldeCrediteur");
                
                cumulsSoldesDebits.put(compteParent, cumulsSoldesDebits.get(compteParent) != null ? cumulsSoldesDebits.get(compteParent) + soldeDebit : soldeDebit);
                cumulsSoldesDebits.put(compteParent, cumulsSoldesCredits.get(compteParent) != null ? cumulsSoldesCredits.get(compteParent) + soldeCredit : soldeCredit);
             }
          });
          
          // Créer des Row(siret, numeroCompte, cumulSoldesDebiteurs à 7 chiffres, cumulSoldeCrediteur à 7 chiffres, ..., , cumulSoldesDebiteurs à 3 chiffres, cumulSoldeCrediteur à 3 chiffres) 
          for(Row compte : comptes) {
             String numeroCompte = compte.getAs("numeroCompte");
             double sd[] = new double[6]; 
             double sc[] = new double[6]; 
             
             for(int nombreChiffres = numeroCompte.length(); nombreChiffres >= 3; nombreChiffres--) {
                String compteParent = numeroCompte.substring(0, nombreChiffres);
                Double cumulDebits = cumulsSoldesDebits.get(compteParent);
                Double cumulCredits = cumulsSoldesCredits.get(compteParent);
                
                sd[nombreChiffres - 3] = cumulDebits != null ? Precision.round(cumulDebits, 2, BigDecimal.ROUND_CEILING) : 0.0;
                sc[nombreChiffres - 3] = cumulCredits != null ? Precision.round(cumulCredits, 2, BigDecimal.ROUND_CEILING) : 0.0;
             }
             
             Row rowCumulsPourCompte = RowFactory.create(etablissement.getSiret(), numeroCompte, sd[4], sc[4], sd[3], sc[3], sd[2], sc[2], sd[1], sc[1], sd[0], sc[0]);         
             cumulsRow.add(rowCumulsPourCompte);
          }
    
          return cumulsRow;
       }
    }
    

    如您所见,它涉及到使用 ArrayList ,一些工会在最后(我还没有运行它)。但是你看到了整个问题:它笨拙而且看起来。。。不安全的。

    /**
     * Calculer Rassembler les comptes.
     * @param session Session Spark.
     * @param comptes Comptes candidats.
     * @return Liste des comptes complétés sur chaque ligne de leur comptes racines cumulés.
     */
    protected Dataset<Row> calculerRacinesDesComptes(SparkSession session, Dataset<Row> comptes) {
       Dataset<ArrayList> comptesParSiret = comptes
          .groupByKey((MapFunction<Row, ClefEtablissement>) (Row r) -> 
             new ClefEtablissement(r.getAs("codeDepartement"), r.getAs("codeCommune"), r.getAs("siret")), Encoders.bean(ClefEtablissement.class))
          .mapGroups(new CumulComptesRacinesGroupFunction(), Encoders.bean(ArrayList.class));
       
       StructType schema = new StructType()
          .add("siret", StringType, false)
          .add("numeroCompte", StringType, false)
          .add("soldeDebiteur7chiffres", StringType, false)
          .add("soldeCrediteur7chiffres", StringType, false)
          .add("soldeDebiteur6chiffres", StringType, false)
          .add("soldeCrediteur6chiffres", StringType, false)
          .add("soldeDebiteur5chiffres", StringType, false)
          .add("soldeCrediteur5chiffres", StringType, false)
          .add("soldeDebiteur4chiffres", StringType, false)
          .add("soldeCrediteur4chiffres", StringType, false)
          .add("soldeDebiteur3chiffres", StringType, false)
          .add("soldeCrediteur3chiffres", StringType, false);            
             
       List<Dataset<Row>> ensembles = new ArrayList<>();
       
       comptesParSiret.foreach((ForeachFunction<ArrayList>) comptesAvecCumulsPourUnSiret -> {
          Dataset<Row> ensembleComptesSiret = session.createDataFrame(comptesAvecCumulsPourUnSiret, schema);
          ensembles.add(ensembleComptesSiret); 
       });
       
       Dataset<Row> union = null;
       
       for(Dataset<Row> ensemble : ensembles) {
          union = union != null ? union.union(ensemble) : union;
       }
       
       if (union == null) {
          // FIXME : I don't remember how to create an empty dataset with an underlying schema.
       }
    
       return union;
    }
    

    斯卡拉 . 我不使用它有两个原因:

    1. 我的主题是深入分析城市,地方政府,收支平衡。。。还有很多商业规则要遵循。它不能被一种只想专注于大数据主题的语言来处理。

    2. 能打电话吗 函数,但反之则不然。 火花 现在他们希望:它与 弹簧套

    火花塞 斯巴克 火花 在上定义的方法 斯卡拉