代码之家  ›  专栏  ›  技术社区  ›  ParisaN

Hadoop:哪个映射器返回了哪个结果?

  •  0
  • ParisaN  · 技术社区  · 6 年前

    我是Hadoop的新手。我想运行一个MapReduce示例,并使用calculator mapper查看其结果。也就是说,我想知道,每个中间结果是由哪个映射器计算的?有可能吗?怎么用?

    1 回复  |  直到 6 年前
        1
  •  0
  •   Abhinav    6 年前

    输入的文本文件示例:

    $bin/hadoop dfs-ls/wordcount/input/

    /字数/输入/文件02

    $bin/hadoop dfs-cat/wordcount/input/file01

    $bin/hadoop dfs-cat/wordcount/input/file02

    你好Hadoop再见Hadoop

    $bin/hadoop jar /usr/hdp/2.6x.x/hadoop-mapreduce/hadoo-mapreduce-示例.jar 字数/字数/输入/字数/输出

    输出:

    再见1

    再见1

    Hadoop 2

    现在,让我们看看mapper和reducer是如何在后端工作的:

    这个 制图员 (第14-26行) ,通过 ,一次处理一行,如指定的 TextInputFormat(第49行) . 然后,它通过StringTokenizer将行拆分为以空格分隔的标记,并发出一个键值对<,1>。

    第一张地图 排放物:

    <你好,1>

    <

    <再见,1>

    <世界,1>

    这个 第二张地图 发射:

    <你好,1>

    <

    <再见,1>

    <Hadoop,1>

    我们将在本教程稍后的部分中进一步了解为给定作业生成的贴图数量,以及如何以细粒度的方式控制它们。

    WordCount还指定 ). 因此,每个映射的输出在对键进行排序后,通过本地合并器(与作业配置中的Reducer相同)进行本地聚合。

    第一张地图 :

    <再见,1>

    <

    <世界,2>

    第二张地图 :

    <再见,1>

    <Hadoop,2>

    <你好,1>

    这个 减速器 实施 (第28-36行)

    作业的输出是 :

    <

    <再见,1>

    <Hadoop,2>

    <

    <

    run方法指定作业的各个方面,例如JobConf中的输入/输出路径(通过命令行传递)、键/值类型、输入/输出格式等。然后它调用 JobClient.runJob(第55行) 提交并监视其进度。

    1.  package org.myorg;
    
    2.  
    
    3.  import java.io.IOException;
    
    4.  import java.util.*;
    
    
    5.  
    
    6.  import org.apache.hadoop.fs.Path;
    
    7.  import org.apache.hadoop.conf.*;
    
    8.  import org.apache.hadoop.io.*;
    
    9.  import org.apache.hadoop.mapred.*;
    
    10. import org.apache.hadoop.util.*;
    
    11. 
    
    12. public class WordCount {
    
    13. 
    
    14.    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
    
    15.      private final static IntWritable one = new IntWritable(1);
    
    16.      private Text word = new Text();
    
    17. 
    
    18.      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
    
    19.        String line = value.toString();
    
    20.        StringTokenizer tokenizer = new StringTokenizer(line);
    
    21.        while (tokenizer.hasMoreTokens()) {
    
    22.          word.set(tokenizer.nextToken());
    
    23.          output.collect(word, one);
    
    24.        }
    
    25.      }
    
    26.    }
    
    27. 
    
    28.    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
    
    29.      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
    
    
    30.        int sum = 0;
    
    31.        while (values.hasNext()) {
    
    32.          sum += values.next().get();
    
    33.        }
    
    34.        output.collect(key, new IntWritable(sum));
    
    35.      }
    
    36.    }
    
    37. 
    
    38.    public static void main(String[] args) throws Exception {
    
    39.      JobConf conf = new JobConf(WordCount.class);
    
    40.      conf.setJobName("wordcount");
    
    44. 
    
    45.      conf.setMapperClass(Map.class);
    
    46.      conf.setCombinerClass(Reduce.class);
    
    47.      conf.setReducerClass(Reduce.class);
    
    48. 
    
    49.      conf.setInputFormat(TextInputFormat.class);
    
    50.      conf.setOutputFormat(TextOutputFormat.class);
    
    51. 
    
    52.      FileInputFormat.setInputPaths(conf, new Path(args[0]));
    
    53.      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
    
    54. 
    
    55.      JobClient.runJob(conf);
    
    57.    }
    
    58. }
    
    59.
    

    参考: MapReduce Tutorial