代码之家  ›  专栏  ›  技术社区  ›  nagendra

使用spark sql/Hive更新子表的记录

  •  0
  • nagendra  · 技术社区  · 6 年前

    我需要将子表的外键与父表同步。子表中的键远远领先于父表。因此,我需要更新子表ID以与父表同步。

    表A的架构:

    Id,name,age,height
    101,xxx,24,21
    102,aa,25,21
    103,bb,26,21
    104,cc,27,21
    105,dd,28,21
    

    表B的架构:

    Id,route
    101,22.21
    102,23.21
    213,34.55
    214,25.55
    216,22.44
    

    我想用103104105分别更新第二个表最后3行的ID。子表有9710369条记录,这些记录与父ID的顺序相同。我已经编写了spark程序如下。但不幸的是,spark sql作业需要花费大量时间来更新数据。我还将分区合并为一个分区以维持秩序

    val sourceIDs = "select id from parent table where  id > 5790681;
    val sourceRDD = hc.sql(sourceIDs)
    val ids = sourceRDD.map(r=>r.getLong(0)).collect().toList
    val ss = sc.broadcast(ids);
    
    val afterOffset = "select * from child table where id > 5790681;
    val afterOffsetRDD = hc.sql(afterOffset).coalesce(1)
    val count = new java.util.concurrent.atomic.AtomicInteger(0)
    val modsetChange = afterOffsetRDD.map({
        row => (ss.value(count.incrementAndGet),row.getInt(1))
    }).toDF()
    modsetChange.write.format("orc").mode(SaveMode.Overwrite).saveAsTable(targettable);
    

    注意:ID不是按顺序排列的,需要以相同的顺序获取父表,子表有15331个分区。

    我想知道这项工作到底在哪里花费了更多的时间?。 还有没有办法通过保持相同的顺序,在spark中以分布式方式实现上述转换?。 此外,我们可以使用Hive实现上述功能吗?。

    感谢您的帮助。

    提前谢谢。

    1 回复  |  直到 6 年前
        1
  •  0
  •   GPI    6 年前

    我想知道这项工作到底在哪里花费了更多的时间?

    最肯定的是,使用大规模并行大数据框架,只需在锁定的原语上以单线程方式工作(即使 AtomicLong 工作速度很快)相当慢。此外,要实现这一点,还需要一些耗时的步骤,例如 collect ing数据(可能无法正常工作,因为您的驱动器内存中可能包含的数据太多)。 总而言之,正如您所猜测的,这不是正确的方法。

    最重要的一点是,在spark编程模型中,使用原子计数器是无效的:

    val count = new java.util.concurrent.atomic.AtomicInteger(0)
    val modsetChange = afterOffsetRDD.map({
        row => (ss.value(count.incrementAndGet),row.getInt(1))
    }).toDF()
    

    这是因为,在spark编程模型中,rdd操作发生在“worker”节点上,而主程序由“driver”节点执行。在这里 count 因此,变量保存在驱动程序中 row => ... 代码由工作人员执行,这些工作人员甚至可能不在同一台计算机上。Spark通过运送 复制 属于 计数 给每个工人,意味着每个工人都有自己的 计数 。这在这里是可行的,因为通过将分区合并到一个分区,您只有一个工作者,但如果不是这样,您将得到不可预测的结果。

    所以,举个例子:永远不要在RDD操作中修改驱动端对象。除非这些是广播变量,您可以在文档中查看。

    还有没有办法通过保持相同的顺序,在spark中以分布式方式实现上述转换?

    是的,有。但是:您是否意识到您显示的代码实际上并没有维护任何顺序?(至少,不能保证它会这样做)?那是因为你的 select 条款没有 order by 。因此,执行引擎可以按照它想要的任何方式(例如,不按行顺序)对数据进行重新排序。只要没有SQL API,就无法实现任何目标 ORDER BY 卷入的 没有这一点,Spark不能保证RDD元素是有序的。您可能在某些情况下确实提供了分类行为,只是“默认情况下”没有提供。

    但是,如果原始输入是文本文件,则始终可以执行以下操作:

    sc.textFile(yourDataFile)
    

    并确保线路整齐。

    如果你已经解决了上述问题,我会这样做。

    使用纯RDD API:

    假设表A和表B为有序RDD:

    scala> val rddA = sc.parallelize(Seq((101, "xxx"), (102, "aa"), (103, "bb")))
    scala> val rddB = sc.parallelize(Seq((101, 22), (102, 23), (213, 34)))
    

    那么我想做的是使用 zipWithIndex 将行号添加到两个文件的每一行。然后,我将要求Spark通过将这些行号分组来连接两个RDD。

    val rddAWithPosition = rddA.zipWithIndex.map(_.swap)
    val rddBWithPosition = rddB.zipWithIndex.map(_.swap)
    val joinRDD = rddAWithPosition.join(rddBWithPosition)
    // What JoinRDD looks like : 
    scala> joinRDD.take(1)
    res2: Array[(Long, ((Int, String), (Int, Int)))] = Array((0,((101,xxx),(101,22))))
    

    您可以看到每个RDD元素现在是一个3元组,其中1)行号,2)tableA元素,3)table B元素。您现在可以根据需要重新安排。

    此外,我们可以使用Hive实现上述功能吗?。

    是的,如果您可以再次定义订单,并且 use the row_number() function 。然后,再次使用tableA和tableB的行号创建一个新表,然后在行号上联接。