代码之家  ›  专栏  ›  技术社区  ›  Philipp

使用python的apache beam中PCollection内几个字段的最大值和最小值

  •  0
  • Philipp  · 技术社区  · 6 年前

    我通过python SDK使用apache beam,存在以下问题:

    我有一个大约有1百万个条目的PCollection,PCollection中的每个条目看起来像一个2元组列表 [(key1,value1),(key2,value2),...] 长度约150。我需要在每个键的PCollection的所有条目中找到最大值和最小值,以便规范化这些值。

    理想情况下,最好使用元组列表获得PCollection [(key,max_value,min_value),...] 然后就可以很容易地进行规范化 [(key1,norm_value1),(key2,norm_value2),...] 哪里 norm_value = (value - min) / (max - min)

    目前我只能用手分别为每个键进行操作,这既不方便也不可持续,所以任何建议都会很有帮助。

    1 回复  |  直到 6 年前
        1
  •  4
  •   Guillem Xercavins    6 年前

    我决定尝试使用自定义 CombineFn 函数确定每个键的最小值和最大值。然后,使用 CoGroupByKey 并应用所需的映射来规范化值。

    """Normalize PCollection values."""
    
    import logging
    import argparse
    import sys
    
    import apache_beam as beam
    from apache_beam.io import WriteToText
    from apache_beam.options.pipeline_options import PipelineOptions
    
    
    # custom CombineFn that outputs min and max value
    class MinMaxFn(beam.CombineFn):
      # initialize min and max values (I assumed int type)
      def create_accumulator(self):
        return (sys.maxint, 0)
    
      # update if current value is a new min or max      
      def add_input(self, min_max, input):
        (current_min, current_max) = min_max
        return min(current_min, input), max(current_max, input)
    
      def merge_accumulators(self, accumulators):
        return accumulators
    
      def extract_output(self, min_max):
        return min_max
    
    
    def run(argv=None):
      """Main entry point; defines and runs the pipeline."""
      parser = argparse.ArgumentParser()
      parser.add_argument('--output',
                          dest='output',
                          required=True,
                          help='Output file to write results to.')
      known_args, pipeline_args = parser.parse_known_args(argv)
    
      pipeline_options = PipelineOptions(pipeline_args)
      p = beam.Pipeline(options=pipeline_options)
    
      # create test data
      pc = [('foo', 1), ('bar', 5), ('foo', 5), ('bar', 9), ('bar', 2)]
    
      # first run through data to apply custom combineFn and determine min/max per key
      minmax = pc | 'Determine Min Max' >> beam.CombinePerKey(MinMaxFn())
    
      # group input data by key and append corresponding min and max 
      merged = (pc, minmax) | 'Join Pcollections' >> beam.CoGroupByKey()
    
      # apply mapping to normalize values according to 'norm_value = (value - min) / (max - min)'
      normalized = merged | 'Normalize values' >> beam.Map(lambda (a, (b, c)): (a, [float(val - c[0][0][0])/(c[0][0][1] -c[0][0][0]) for val in b]))
    
      # write results to output file
      normalized | 'Write results' >> WriteToText(known_args.output)
    
      result = p.run()
      result.wait_until_finish()
    
    if __name__ == '__main__':
      logging.getLogger().setLevel(logging.INFO)
      run()
    

    该代码段可以与一起运行 python SCRIPT_NAME.py --output OUTPUT_FILENAME .我的测试数据(按键分组)是:

    ('foo', [1, 5])
    ('bar', [5, 9, 2])
    

    CombineFn将按密钥最小值和最大值返回:

    ('foo', [(1, 5)])
    ('bar', [(2, 9)])
    

    按键联接/合并组操作的输出:

    ('foo', ([1, 5], [[(1, 5)]]))
    ('bar', ([5, 9, 2], [[(2, 9)]]))
    

    正常化后:

    ('foo', [0.0, 1.0])
    ('bar', [0.42857142857142855, 1.0, 0.0])
    

    这只是一个简单的测试,所以我确信它可以针对所提到的数据量进行优化,但它似乎是一个起点。考虑到可能需要进一步考虑(即,如果最小值=最大值,则避免除以零)