代码之家  ›  专栏  ›  技术社区  ›  Alexander

pyspark递归密钥搜索

  •  0
  • Alexander  · 技术社区  · 6 年前

    我有一个嵌套很深的json风格的结构,我需要在所有级别(最多7次)搜索给定的键来查找所有事件。级别0中始终存在一些数据,我需要将这些数据与 search_key 在任何级别都可以找到。我尝试过通过递归调用推送这些数据,并在返回时将其追加,但当我将其从标准python移到pyspark rdd时,遇到了堆和不可更改的类型问题。

    我的搜索功能如下:

    def search(input, search_key, results):
    
        if input:
            for i in input:
                if isinstance(i, list):
                    search(i, search_key, results)
    
                elif isinstance(i, dict):
                    for k, v in i.iteritems():
                        if k == search_key:
                            results.append(i)
                            continue
                        elif isinstance(v, list):
                            search(v, search_key, results)
                        elif isinstance(v, dict):
                            search(v, search_key, results)
    
            return results
    

    我称之为:

    origin_rdd = sc.parallelize(origin)
    concept_lambda = lambda c: search(c, term, [])
    results = origin_rdd.flatMap(concept_lambda)
    

    有谁能提出一种方法来捕获顶级数据,并将其作为每个对象的一部分显示在结果中?结果可以是0到n,因此一个由7个键组成的乘积始终显示在顶级中,然后显示搜索词集合的所有匹配项。然后,我想将结果rdd中的每一行转换为一个pyspark行,以便与一个pyspark数据帧一起使用。我还没有找到一个好的方法来从数据帧开始,而不是从RDD开始,或者将搜索功能应用到数据帧列,因为结构在其模式中是高度动态的,但是如果有人认为这是一个更好的路由,我很乐意听取建议。

    1 回复  |  直到 6 年前
        1
  •  0
  •   Alexander    6 年前

    当我的搜索成功时,使用deepcopy时,我可以通过切割和传递基地来解决我的问题。其他人试图做类似的事情,可以调整下面的切片。

    origin_rdd = sc.parallelize(origin)
    concept_lambda = lambda r: search(r[-1], r[0:9], term, [])
    results = origin_rdd.flatMap(concept_lambda)
    

    搜索功能

    def search(input, row_base, search_key, results):    
        if input:
            for i in input:
                if isinstance(i, list):
                    search(i, row_base, search_key, results)
                if isinstance(i, dict):
                    for k, v in iteritems(i):
                        if k == search_key:
                            row = copy.deepcopy(row_base)
                            row.append(i)
                            results.append(row)
                            continue
                        elif isinstance(v, list):
                            search(v, row_base, search_key, results)
                        elif isinstance(v, dict):
                            search(v, row_base, search_key, results)
    
        return results