代码之家  ›  专栏  ›  技术社区  ›  VeLKerr

还原前映射后IOException

  •  0
  • VeLKerr  · 技术社区  · 10 年前

    我有一个数据集,其中的每条记录包含两个字段:

    • URL(无前缀);
    • 寿命(秒)。

    我想计算平均寿命 对于每个域。一、 e.如果我有以下两项记录:

    hadoop.apache.org/docs/current 22118400
    hadoop.apache.org/docs/current/api/org/ 27820800
    

    我应该收到答案:

    hadoop.apache.org 289
    

    对于这些计算,我写了一个hadoop作业:

    package ru.bdata.siteslifes;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import ru.bdata.siteslifes.arrays.IntArrayWritable;
    
    import java.io.IOException;
    import java.util.Iterator;
    
        public static class DomainMapper extends Mapper<Text, IntWritable, Text, IntWritable> {
    
            @Override
            public void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException{
                String url = key.toString();
                context.write(new Text(url.substring(0, url.indexOf('/'))), value);
            }
        }
    
        public static class AvgCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
            private static final int SEC_IN_DAY = 86400;
    
            @Override
            public void reduce(Text key, Iterable<IntWritable> value, Context context)
                    throws IOException,InterruptedException{
                float sum = 0;
                int cnt = 0;
                Iterator<IntWritable> it = value.iterator();
                while (it.hasNext()){
                    sum += it.next().get();
                    cnt++;
                }
                context.write(key, new IntWritable(Math.round(sum / (cnt * SEC_IN_DAY))));
            }
        }
    
        @Override
        public int run(String[] strings) throws Exception {
            Configuration conf = getConf();
            Job job = new Job(conf);
            job.setJarByClass(AvgSiteLifeCounter.class);
    
            job.setMapperClass(DomainMapper.class);
            job.setCombinerClass(AvgCombiner.class);
            job.setReducerClass(Reducer.class);
    
            job.setInputFormatClass(SequenceFileInputFormat.class);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            job.setNumReduceTasks(8);
    
            SequenceFileInputFormat.addInputPath(job, new Path(strings[0]));
            SequenceFileOutputFormat.setOutputPath(job, new Path(strings[1]));
    
            return job.waitForCompletion(true)? 0: 1;
        }
    }
    

    当我在集群上执行程序时,映射部分工作正常,但在reducer部分启动之前,我看到异常:

    java.io.IOException: wrong key class: org.apache.hadoop.io.Text is not class org.apache.hadoop.io.LongWritable
        at org.apache.hadoop.io.SequenceFile$Writer.append(SequenceFile.java:1305)
        at org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat$1.write(SequenceFileOutputFormat.java:74)
        at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:551)
        at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:85)
        at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:99)
        at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:144)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:164)
        at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:610)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:444)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.ja...
    

    我应该如何更改代码以使reduce部分也能工作?

    由于此作业只是hadoop任务的一部分,因此输入和输出数据表示为二进制文件( SequenceFile ).

    P.S.如你所见,我不使用 LongWritable 只有 IntWritable 。但在异常日志中,我看到 长可写 .

    2 回复  |  直到 10 年前
        1
  •  0
  •   VeLKerr    10 年前

    之后 地图阶段 Hadoop将结果写入临时文件并在其之后 减速器,减速器 读取此数据。在里面 run() 方法没有数据的键和值设置,必须从临时文件中读取。 因此,在为mapper的结果设置了键和值之后,我的代码就变得有效了。一、 e.我在run()方法中添加了这样的字符串:

    conf.setMapOutputKeyClass(Text.class);
    conf.setMapOutputValueClass(IntWritable.class); 
    
        2
  •  0
  •   STLSOFT Big Data Training    10 年前

    当驱动程序代码设置的一个作业知道的类型与运行时实际接收的类型之间存在类型不匹配时,Hadoop会给出IO异常,这有点令人困惑。