代码之家  ›  专栏  ›  技术社区  ›  Darshan Mehta

在数据流中执行聚合

  •  0
  • Darshan Mehta  · 技术社区  · 6 年前

    Bigtable filter 并执行聚合。我使用以下配置来获取 connection (执行范围扫描等):

    Connection connection = BigtableConfiguration.connect(projectId, instanceId);
    Table table = connection.getTable(TableName.valueOf(tableId)); 
    
    table.getScanner(<a scanner with filter>);
    

    ResultScanner 我可以迭代这些行。但是,我要做的是,对某些列执行聚合并获取值。我想做的SQL等价物是:

    SELECT SUM(A), SUM(B)
    FROM table
    WHERE C = D;
    

    HBase AggregationClient (javadoc文件) here )但是,它需要 Configuration 大表 (这样我就不需要使用低级别的Hbase api)。

    我查看了文档,没有找到任何可以这样做的东西(在Java中)。有人能分享一个例子吗 aggregation 在BigTable上使用(非行键或任何)筛选器。

    2 回复  |  直到 6 年前
        1
  •  2
  •   Solomon Duskis    6 年前

    WHERE C = D ,因此这种类型的处理通常最好在客户端完成。

    AggregationClient 是HBase协处理器。Cloud Bigtable不支持协处理器。

    如果您想将Cloud Bigtable用于这种类型的聚合,则必须使用 table.scan() 还有你自己的逻辑。如果规模足够大,则必须使用Dataflow或BigQuery来执行聚合。

        2
  •  1
  •   ACGray    6 年前

    PCollection<TableRow> rows = p.apply(BigQueryIO.readTableRows()
      .fromQuery("SELECT A, B FROM table;"));
    
    PCollection<KV<String, Integer>> valuesA =
      rows.apply(
        MapElements.into(TypeDescriptors.kvs(
          TypeDescriptors.strings(),
          TypeDescriptors.integers()))
          .via((TableRow row) -> KV.of(
            "A", (Integer) row.getF().get(0).getV())));
    
    PCollection<KV<String, Integer>> valuesB =
      rows.apply(
        MapElements.into(TypeDescriptors.kvs(
          TypeDescriptors.strings(),
          TypeDescriptors.integers()))
          .via((TableRow row) -> KV.of(
            "B", (Integer) row.getF().get(1).getV())));
    
    PCollection<KV<String, Integer>> sums =
      PCollectionList.of(sumOfA).and(sumOfB)
        .apply(Flatten.pCollections())
        .apply(Sum.integersPerKey());