代码之家  ›  专栏  ›  技术社区  ›  Niels Basjes

在Hadoop中链接多个MapReduce作业

  •  117
  • Niels Basjes  · 技术社区  · 14 年前

    在许多应用mapreduce的实际情况中,最终的算法最终是几个mapreduce步骤。

    即map1、reduce1、map2、reduce2等。

    所以您得到了最后一个reduce的输出,这是作为下一个映射的输入所需要的。

    中间数据是您(通常)在成功完成管道后不希望保留的数据。另外,因为这个中间数据通常是一些数据结构(如“map”或“set”),所以您不希望在编写和读取这些键值对时投入太多精力。

    在Hadoop中,推荐的方法是什么?

    是否有一个(简单的)示例演示如何以正确的方式处理中间数据,包括随后的清理?

    13 回复  |  直到 6 年前
        1
  •  53
  •   gsamaras    8 年前

    我认为雅虎开发者网络的本教程将帮助您: Chaining Jobs

    你用的是 JobClient.runJob() . 第一个作业的数据输出路径将成为第二个作业的输入路径。需要使用适当的代码将这些参数作为参数传递给作业,以便对它们进行分析并设置作业的参数。

    不过,我认为上述方法可能是现在较旧的mapred API所采用的方法,但它仍然可以工作。在新的MapReduce API中也有类似的方法,但我不确定它是什么。

    只要在作业完成后删除中间数据,就可以在代码中执行此操作。我以前做过的方法是使用类似于:

    FileSystem.delete(Path f, boolean recursive);
    

    其中路径是数据在HDF上的位置。您需要确保只有在没有其他作业需要此数据时才删除它。

        2
  •  17
  •   maxteneff    8 年前

    有很多方法可以做到。

    (1) 级联作业

    为第一个作业创建jobconf对象“job1”,并设置所有参数,其中“input”为input directory,“temp”为output directory。执行此作业:

    JobClient.run(job1).
    

    在它的正下方,为第二个作业创建jobconf对象“job2”,并设置所有参数,其中“temp”为inputdirectory,“output”为output directory。执行此作业:

    JobClient.run(job2).
    

    (2) 创建两个jobconf对象,并按如下方式设置其中的所有参数 (1) 除了不使用jobclient.run。

    然后使用jobconf作为参数创建两个job对象:

    Job job1=new Job(jobconf1); 
    Job job2=new Job(jobconf2);
    

    使用JobControl对象,可以指定作业依赖项,然后运行作业:

    JobControl jbcntrl=new JobControl("jbcntrl");
    jbcntrl.addJob(job1);
    jbcntrl.addJob(job2);
    job2.addDependingJob(job1);
    jbcntrl.run();
    

    (3) 如果需要类似map+reduce map*的结构,可以使用Hadoop 0.19及更高版本附带的chainmapper和chainreducer类。

    干杯

        3
  •  7
  •   cwensel    14 年前

    实际上有很多方法可以做到这一点。我集中在两个问题上。

    一个是经由浅滩( http://github.com/cwensel/riffle )一个注释库,用于识别依赖的事物并按依赖(拓扑)顺序“执行”。

    或者可以在层叠中使用层叠(和mapreduceflow)( http://www.cascading.org/ )未来的版本将支持riffle注释,但现在它对raw jobconf jobs非常有用。

    这方面的一个变种是根本不手工管理mr作业,而是使用级联API开发应用程序。然后,jobconf和job chaining通过级联计划器和流类进行内部处理。

    通过这种方式,您可以将时间集中在问题上,而不是管理Hadoop作业等机制上。您甚至可以在顶部分层不同的语言(如Clojure或JRuby),以进一步简化您的开发和应用程序。 http://www.cascading.org/modules.html

        4
  •  6
  •   psrklr    9 年前

    我一个接一个地使用jobconf对象完成了链接工作。我以wordcount为例来链接这些工作。一个作业计算出给定输出中一个单词a重复的次数。第二个作业将第一个作业输出作为输入,并计算出给定输入中的单词总数。下面是需要放在驱动程序类中的代码。

        //First Job - Counts, how many times a word encountered in a given file 
        JobConf job1 = new JobConf(WordCount.class);
        job1.setJobName("WordCount");
    
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(IntWritable.class);
    
        job1.setMapperClass(WordCountMapper.class);
        job1.setCombinerClass(WordCountReducer.class);
        job1.setReducerClass(WordCountReducer.class);
    
        job1.setInputFormat(TextInputFormat.class);
        job1.setOutputFormat(TextOutputFormat.class);
    
        //Ensure that a folder with the "input_data" exists on HDFS and contains the input files
        FileInputFormat.setInputPaths(job1, new Path("input_data"));
    
        //"first_job_output" contains data that how many times a word occurred in the given file
        //This will be the input to the second job. For second job, input data name should be
        //"first_job_output". 
        FileOutputFormat.setOutputPath(job1, new Path("first_job_output"));
    
        JobClient.runJob(job1);
    
    
        //Second Job - Counts total number of words in a given file
    
        JobConf job2 = new JobConf(TotalWords.class);
        job2.setJobName("TotalWords");
    
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(IntWritable.class);
    
        job2.setMapperClass(TotalWordsMapper.class);
        job2.setCombinerClass(TotalWordsReducer.class);
        job2.setReducerClass(TotalWordsReducer.class);
    
        job2.setInputFormat(TextInputFormat.class);
        job2.setOutputFormat(TextOutputFormat.class);
    
        //Path name for this job should match first job's output path name
        FileInputFormat.setInputPaths(job2, new Path("first_job_output"));
    
        //This will contain the final output. If you want to send this jobs output
        //as input to third job, then third jobs input path name should be "second_job_output"
        //In this way, jobs can be chained, sending output one to other as input and get the
        //final output
        FileOutputFormat.setOutputPath(job2, new Path("second_job_output"));
    
        JobClient.runJob(job2);
    

    运行这些作业的命令是:

    垃圾桶/Hadoop罐总计单词。

    我们需要为命令提供最终的作业名称。在上述情况下,它是totalwords。

        5
  •  5
  •   Aniruddha Sinha    8 年前

    您可以按照代码中给出的方式运行mr chain。

    请注意 :仅提供了驱动程序代码

    public class WordCountSorting {
    // here the word keys shall be sorted
          //let us write the wordcount logic first
    
          public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException {
                //THE DRIVER CODE FOR MR CHAIN
                Configuration conf1=new Configuration();
                Job j1=Job.getInstance(conf1);
                j1.setJarByClass(WordCountSorting.class);
                j1.setMapperClass(MyMapper.class);
                j1.setReducerClass(MyReducer.class);
    
                j1.setMapOutputKeyClass(Text.class);
                j1.setMapOutputValueClass(IntWritable.class);
                j1.setOutputKeyClass(LongWritable.class);
                j1.setOutputValueClass(Text.class);
                Path outputPath=new Path("FirstMapper");
                FileInputFormat.addInputPath(j1,new Path(args[0]));
                      FileOutputFormat.setOutputPath(j1,outputPath);
                      outputPath.getFileSystem(conf1).delete(outputPath);
                j1.waitForCompletion(true);
                      Configuration conf2=new Configuration();
                      Job j2=Job.getInstance(conf2);
                      j2.setJarByClass(WordCountSorting.class);
                      j2.setMapperClass(MyMapper2.class);
                      j2.setNumReduceTasks(0);
                      j2.setOutputKeyClass(Text.class);
                      j2.setOutputValueClass(IntWritable.class);
                      Path outputPath1=new Path(args[1]);
                      FileInputFormat.addInputPath(j2, outputPath);
                      FileOutputFormat.setOutputPath(j2, outputPath1);
                      outputPath1.getFileSystem(conf2).delete(outputPath1, true);
                      System.exit(j2.waitForCompletion(true)?0:1);
          }
    
    }
    

    顺序是

    ( JOB1 )映射->减少->。( 作业2 地图
    这样做是为了将键排序,但是还有更多的方法,例如使用treemap
    不过,我想把你的注意力集中在工作被束缚的方式上!!
    谢谢你

        6
  •  4
  •   user300313    14 年前

    您可以使用Oozie进行Barch处理MapReduce作业。 http://issues.apache.org/jira/browse/HADOOP-5303

        7
  •  3
  •   Christie English    13 年前

    ApacheMahout项目中有一些例子将多个MapReduce作业链接在一起。其中一个示例可以在以下位置找到:

    推荐人job.java

    http://search-lucene.com/c/Mahout:/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java%7C%7CRecommenderJob

        8
  •  3
  •   akjoshi HCP    11 年前

    我们可以利用 waitForCompletion(true) 定义作业之间依赖关系的作业方法。

    在我的场景中,我有3个相互依赖的工作。在Driver类中,我使用了下面的代码,它按预期工作。

    public static void main(String[] args) throws Exception {
            // TODO Auto-generated method stub
    
            CCJobExecution ccJobExecution = new CCJobExecution();
    
            Job distanceTimeFraudJob = ccJobExecution.configureDistanceTimeFraud(new Configuration(),args[0], args[1]);
            Job spendingFraudJob = ccJobExecution.configureSpendingFraud(new Configuration(),args[0], args[1]);
            Job locationFraudJob = ccJobExecution.configureLocationFraud(new Configuration(),args[0], args[1]);
    
            System.out.println("****************Started Executing distanceTimeFraudJob ================");
            distanceTimeFraudJob.submit();
            if(distanceTimeFraudJob.waitForCompletion(true))
            {
                System.out.println("=================Completed DistanceTimeFraudJob================= ");
                System.out.println("=================Started Executing spendingFraudJob ================");
                spendingFraudJob.submit();
                if(spendingFraudJob.waitForCompletion(true))
                {
                    System.out.println("=================Completed spendingFraudJob================= ");
                    System.out.println("=================Started locationFraudJob================= ");
                    locationFraudJob.submit();
                    if(locationFraudJob.waitForCompletion(true))
                    {
                        System.out.println("=================Completed locationFraudJob=================");
                    }
                }
            }
        }
    
        9
  •  2
  •   Xavi    8 年前

    新的类org.apache.hadoop.mapreduce.lib.chain.chainmapper有助于这个场景。

        10
  •  1
  •   Pranab    13 年前

    虽然有复杂的基于服务器的Hadoop工作流引擎,例如OOZIE,但我有一个简单的Java库,它能够执行多个Hadoop作业作为工作流。定义作业间依赖关系的作业配置和工作流在JSON文件中配置。所有东西都是外部配置的,并且不需要在现有的map reduce实现中进行任何更改就可以作为工作流的一部分。

    详情可在这里找到。源代码和JAR在GitHub中可用。

    http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/

    普拉纳布

        11
  •  1
  •   stholy    11 年前

    我认为Oozie可以帮助后续的工作直接从上一个工作中获得信息。这样可以避免使用JobControl执行I/O操作。

        12
  •  1
  •   Erik Schmiegelow    10 年前

    如果您想以编程方式链接作业,您将需要使用JobControl。使用非常简单:

        JobControl jobControl = new JobControl(name);
    

    然后添加可控的作业实例。ControlledJob定义了一个具有依赖性的作业,从而自动插入输入和输出以适应作业的“链”。

        jobControl.add(new ControlledJob(job, Arrays.asList(controlledjob1, controlledjob2));
    
        jobControl.run();
    

    启动链条。你要把它放在一个特殊的线程中。这样可以检查链条运行时的状态:

        while (!jobControl.allFinished()) {
            System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size());
            System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size());
            System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size());
            List<ControlledJob> successfulJobList = jobControl.getSuccessfulJobList();
            System.out.println("Jobs in success state: " + successfulJobList.size());
            List<ControlledJob> failedJobList = jobControl.getFailedJobList();
            System.out.println("Jobs in failed state: " + failedJobList.size());
        }
    
        13
  •  0
  •   Neha Kumari    6 年前

    正如您在需求中提到的,您希望mrjob1的O/P是mrjob2的I/P,等等,您可以考虑对此用例使用oozie工作流。您也可以考虑将中间数据写入HDF,因为它将被下一个mrjob使用。作业完成后,您可以清理中间数据。

    <start to="mr-action1"/>
    <action name="mr-action1">
       <!-- action for MRJob1-->
       <!-- set output path = /tmp/intermediate/mr1-->
        <ok to="end"/>
        <error to="end"/>
    </action>
    
    <action name="mr-action2">
       <!-- action for MRJob2-->
       <!-- set input path = /tmp/intermediate/mr1-->
        <ok to="end"/>
        <error to="end"/>
    </action>
    
    <action name="success">
            <!-- action for success-->
        <ok to="end"/>
        <error to="end"/>
    </action>
    
    <action name="fail">
            <!-- action for fail-->
        <ok to="end"/>
        <error to="end"/>
    </action>
    
    <end name="end"/>