|
@@ -1,407 +0,0 @@
|
|
|
-package org.apache.hadoop.benchmarks.mapred;
|
|
|
-
|
|
|
-import java.io.File;
|
|
|
-import java.io.IOException;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Iterator;
|
|
|
-import java.util.Random;
|
|
|
-
|
|
|
-import org.apache.commons.logging.Log;
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.fs.FileSystem;
|
|
|
-import org.apache.hadoop.fs.FileUtil;
|
|
|
-import org.apache.hadoop.fs.Path;
|
|
|
-import org.apache.hadoop.io.LongWritable;
|
|
|
-import org.apache.hadoop.io.UTF8;
|
|
|
-import org.apache.hadoop.mapred.JobClient;
|
|
|
-import org.apache.hadoop.mapred.JobConf;
|
|
|
-import org.apache.hadoop.mapred.TextInputFormat;
|
|
|
-import org.apache.hadoop.mapred.TextOutputFormat;
|
|
|
-
|
|
|
-/**
|
|
|
- * Runs a job multiple times and takesaverage of all runs.
|
|
|
- * @author sanjaydahiya
|
|
|
- *
|
|
|
- */
|
|
|
-public class MultiJobRunner {
|
|
|
-
|
|
|
- private String jarFile = null ; // "MRBenchmark.jar" ;
|
|
|
- private String input ;
|
|
|
- private String output ;
|
|
|
- private int numJobs = 1 ; // default value
|
|
|
- private static final Log LOG = LogFactory.getLog(MultiJobRunner.class);
|
|
|
- private int numMaps = 2;
|
|
|
- private int numReduces = 1;
|
|
|
- private int dataLines = 1;
|
|
|
- private boolean ignoreOutput = false ;
|
|
|
- private boolean verbose = false ;
|
|
|
-
|
|
|
- // just to print in the end
|
|
|
- ArrayList execTimes = new ArrayList();
|
|
|
-
|
|
|
- private static String context = "/mapred/benchmark";
|
|
|
-
|
|
|
- /**
|
|
|
- * Input is a local file.
|
|
|
- * @param input
|
|
|
- * @param output
|
|
|
- * @param jarFile
|
|
|
- */
|
|
|
- public MultiJobRunner(String input, String output, String jarFile){
|
|
|
- this.input = input ;
|
|
|
- this.output = output ;
|
|
|
- this.jarFile = jarFile ;
|
|
|
- }
|
|
|
-
|
|
|
- public String getInput() {
|
|
|
- return input;
|
|
|
- }
|
|
|
-
|
|
|
- public void setInput(String input) {
|
|
|
- this.input = input;
|
|
|
- }
|
|
|
-
|
|
|
- public String getJarFile() {
|
|
|
- return jarFile;
|
|
|
- }
|
|
|
-
|
|
|
- public void setJarFile(String jarFile) {
|
|
|
- this.jarFile = jarFile;
|
|
|
- }
|
|
|
-
|
|
|
- public String getOutput() {
|
|
|
- return output;
|
|
|
- }
|
|
|
-
|
|
|
- public void setOutput(String output) {
|
|
|
- this.output = output;
|
|
|
- }
|
|
|
-
|
|
|
- public int getNumJobs() {
|
|
|
- return numJobs;
|
|
|
- }
|
|
|
-
|
|
|
- public void setNumJobs(int numJobs) {
|
|
|
- this.numJobs = numJobs;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- public int getDataLines() {
|
|
|
- return dataLines;
|
|
|
- }
|
|
|
-
|
|
|
- public void setDataLines(int dataLines) {
|
|
|
- this.dataLines = dataLines;
|
|
|
- }
|
|
|
-
|
|
|
- public boolean isIgnoreOutput(){
|
|
|
- return this.ignoreOutput ;
|
|
|
- }
|
|
|
-
|
|
|
- public void setIgnoreOutput(boolean ignore){
|
|
|
- this.ignoreOutput = ignore ;
|
|
|
- }
|
|
|
-
|
|
|
- public void setVerbose(boolean verbose){
|
|
|
- this.verbose = verbose ;
|
|
|
- }
|
|
|
- public boolean getVerbose(){
|
|
|
- return this.verbose;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Prepare the jobConf.
|
|
|
- * @return
|
|
|
- */
|
|
|
- private JobConf setupJob(){
|
|
|
- JobConf job = new JobConf() ;
|
|
|
-
|
|
|
- job.addInputPath(new Path(context + "/input"));
|
|
|
-
|
|
|
- job.setInputFormat(TextInputFormat.class);
|
|
|
- job.setOutputFormat(TextOutputFormat.class);
|
|
|
-
|
|
|
- job.setInputKeyClass(LongWritable.class);
|
|
|
- job.setOutputValueClass(UTF8.class);
|
|
|
-
|
|
|
- job.setMapOutputKeyClass(UTF8.class);
|
|
|
- job.setMapOutputValueClass(UTF8.class);
|
|
|
-
|
|
|
- job.setOutputPath(new Path(output));
|
|
|
-
|
|
|
- if( null != jarFile ){
|
|
|
- job.setJar(jarFile);
|
|
|
- }
|
|
|
- job.setMapperClass(BenchmarkMapper.class);
|
|
|
- job.setReducerClass(BenchmarkReducer.class);
|
|
|
-
|
|
|
- job.setNumMapTasks(this.numMaps);
|
|
|
- job.setNumReduceTasks(this.numReduces);
|
|
|
-
|
|
|
- return job ;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Runs a MapReduce task, given number of times. The input to each task is the same file.
|
|
|
- * @param job
|
|
|
- * @param times
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private void runJobInSequence(int times) throws IOException{
|
|
|
- Path intrimData = null ;
|
|
|
- Random rand = new Random();
|
|
|
-
|
|
|
- for( int i= 0;i<times;i++){
|
|
|
- // create a new job conf every time, reusing same object doesnt seem to work.
|
|
|
- JobConf job = setupJob();
|
|
|
-
|
|
|
- // give a new random name to output of the mapred tasks
|
|
|
- // TODO: see if something better can be done
|
|
|
- intrimData = new Path(context+"/temp/multiMapRedOutput_" +
|
|
|
- rand.nextInt() );
|
|
|
- job.setOutputPath(intrimData);
|
|
|
-
|
|
|
- // run the mapred task now
|
|
|
- LOG.info("Running job, Input : " + job.getInputPaths()[0] +
|
|
|
- " Output : " + job.getOutputPath());
|
|
|
- long curTime = System.currentTimeMillis();
|
|
|
- JobClient.runJob(job);
|
|
|
- execTimes.add(new Long(System.currentTimeMillis() - curTime));
|
|
|
-
|
|
|
- // pull the output out of DFS for validation
|
|
|
- File localOutputFile = File.createTempFile("MROutput" +
|
|
|
- new Random().nextInt(), ".txt" );
|
|
|
- String localOutputPath = localOutputFile.getAbsolutePath() ;
|
|
|
- localOutputFile.delete();
|
|
|
-
|
|
|
- if( ! ignoreOutput ){
|
|
|
- copyFromDFS(intrimData, localOutputPath);
|
|
|
- }
|
|
|
-
|
|
|
- // diff(input, localOutputPath);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Not using it.
|
|
|
- */
|
|
|
- private boolean diff(String path1, String path2) throws IOException{
|
|
|
- boolean ret = false ;
|
|
|
-
|
|
|
- return ret ;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Runs a sequence of map reduce tasks, output of each reduce is input
|
|
|
- * to next map. input should be a pre configured array of JobConfs.
|
|
|
- *
|
|
|
- */
|
|
|
- public Path runJobsInSequence(JobConf[] jobs) throws IOException{
|
|
|
-
|
|
|
- // input location = jobs[0] input loc
|
|
|
- Path finalOutput = null ;
|
|
|
-
|
|
|
- for( int i=0;i<jobs.length; i++){
|
|
|
- if( 0 != i ) {
|
|
|
- // run the first job in sequence.
|
|
|
- jobs[i].addInputPath(finalOutput) ;
|
|
|
- }
|
|
|
-
|
|
|
- JobClient.runJob(jobs[i]);
|
|
|
- finalOutput = jobs[i].getOutputPath();
|
|
|
- }
|
|
|
-
|
|
|
- return finalOutput;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- *
|
|
|
- * Copy the input file from local disk to DFS.
|
|
|
- * @param localFile
|
|
|
- * @param remotePath
|
|
|
- * @return
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private Path copyToDFS(String localFile, Path remotePath) throws IOException{
|
|
|
- if( null == remotePath){
|
|
|
- // use temp path under /mapred in DFS
|
|
|
- remotePath = new Path( context+"/input/MRBenchmark_" +
|
|
|
- new Random().nextInt()) ;
|
|
|
- }
|
|
|
- //new File(localPath).
|
|
|
- Configuration conf = new Configuration();
|
|
|
- FileSystem localFS = FileSystem.getLocal(conf);
|
|
|
- FileSystem remoteFS = FileSystem.get(conf);
|
|
|
-
|
|
|
- FileUtil.copy(localFS, new Path(localFile), remoteFS,
|
|
|
- remotePath, false, conf);
|
|
|
-
|
|
|
- if( ignoreOutput) {
|
|
|
- // delete local copy
|
|
|
- new File(localFile).delete();
|
|
|
- }
|
|
|
-
|
|
|
- return remotePath;
|
|
|
- }
|
|
|
-
|
|
|
- private void copyFromDFS(Path remotePath, String localPath)
|
|
|
- throws IOException{
|
|
|
-
|
|
|
- Configuration conf = new Configuration();
|
|
|
- FileSystem localFS = FileSystem.getLocal(conf);
|
|
|
- FileSystem remoteFS = FileSystem.get(conf);
|
|
|
-
|
|
|
- FileUtil.copy(remoteFS, remotePath,
|
|
|
- localFS, new Path(localPath), false, conf);
|
|
|
- }
|
|
|
-
|
|
|
- private void setupContext() throws IOException{
|
|
|
- FileSystem.get(new Configuration()).mkdirs(new Path(context));
|
|
|
- }
|
|
|
- private void clearContext() throws IOException{
|
|
|
- FileSystem.get(new Configuration()).delete(new Path(context));
|
|
|
- }
|
|
|
- /**
|
|
|
- * Run the benchmark.
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- public void run() throws IOException{
|
|
|
-
|
|
|
- setupContext();
|
|
|
- Path path = copyToDFS(input, null);
|
|
|
-
|
|
|
- long time = System.currentTimeMillis();
|
|
|
-
|
|
|
- try{
|
|
|
- runJobInSequence(numJobs);
|
|
|
- }finally{
|
|
|
- clearContext();
|
|
|
- }
|
|
|
-
|
|
|
- if( verbose ) {
|
|
|
- // Print out a report
|
|
|
- System.out.println("Total MapReduce tasks executed: " + this.numJobs);
|
|
|
- System.out.println("Total lines of data : " + this.dataLines);
|
|
|
- System.out.println("Maps : " + this.numMaps +
|
|
|
- " , Reduces : " + this.numReduces);
|
|
|
- }
|
|
|
- int i =0 ;
|
|
|
- long totalTime = 0 ;
|
|
|
- for( Iterator iter = execTimes.iterator() ; iter.hasNext();){
|
|
|
- totalTime += ((Long)iter.next()).longValue() ;
|
|
|
- if( verbose ) {
|
|
|
- System.out.println("Total time for task : " + ++i +
|
|
|
- " , = " + (Long)iter.next());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- long avgTime = totalTime / numJobs ;
|
|
|
- if( verbose ) {
|
|
|
- System.out.println("Avg time : " + avgTime);
|
|
|
- }
|
|
|
-
|
|
|
- System.out.println("DataLines Maps Reduces AvgTime");
|
|
|
- System.out.println(this.dataLines + ", " + this.numMaps + ", " +
|
|
|
- this.numReduces + ", " + avgTime);
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- public int getNumMaps() {
|
|
|
- return numMaps;
|
|
|
- }
|
|
|
-
|
|
|
- public void setNumMaps(int numMaps) {
|
|
|
- this.numMaps = numMaps;
|
|
|
- }
|
|
|
-
|
|
|
- public int getNumReduces() {
|
|
|
- return numReduces;
|
|
|
- }
|
|
|
-
|
|
|
- public void setNumReduces(int numReduces) {
|
|
|
- this.numReduces = numReduces;
|
|
|
- }
|
|
|
-
|
|
|
- public static void main (String[] args) throws IOException{
|
|
|
-
|
|
|
- String version = "MRBenchmark.0.0.1";
|
|
|
- String usage =
|
|
|
- "Usage: MultiJobRunner -inputLines noOfLines -jar jarFilePath " +
|
|
|
- "[-output dfsPath] [-times numJobs] -workDir dfsPath" +
|
|
|
- "[-inputType (ascending, descending, random)]" +
|
|
|
- " -maps numMaps -reduces numReduces -ignoreOutput -verbose" ;
|
|
|
-
|
|
|
- System.out.println(version);
|
|
|
-
|
|
|
- if (args.length == 0) {
|
|
|
- System.err.println(usage);
|
|
|
- System.exit(-1);
|
|
|
- }
|
|
|
-
|
|
|
- String output = "";
|
|
|
- String jarFile = null; //"MRBenchmark.jar" ;
|
|
|
- int numJobs = 1 ;
|
|
|
- int numMaps = 2;
|
|
|
- int numReduces = 1 ;
|
|
|
- int dataLines = 1 ;
|
|
|
- int inputType = GenData.ASCENDING ;
|
|
|
- boolean ignoreOutput = false ;
|
|
|
- boolean verbose = false ;
|
|
|
-
|
|
|
- for (int i = 0; i < args.length; i++) { // parse command line
|
|
|
- if (args[i].equals("-output")) {
|
|
|
- output = args[++i];
|
|
|
- }else if (args[i].equals("-jar")) {
|
|
|
- jarFile = args[++i];
|
|
|
- }else if (args[i].equals("-times")) {
|
|
|
- numJobs = Integer.parseInt(args[++i]);
|
|
|
- }else if(args[i].equals("-workDir")) {
|
|
|
- context = args[++i];
|
|
|
- }else if(args[i].equals("-maps")) {
|
|
|
- numMaps = Integer.parseInt(args[++i]);
|
|
|
- }else if(args[i].equals("-reduces")) {
|
|
|
- numReduces = Integer.parseInt(args[++i]);
|
|
|
- }else if(args[i].equals("-inputLines")) {
|
|
|
- dataLines = Integer.parseInt(args[++i]);
|
|
|
- }else if(args[i].equals("-inputType")) {
|
|
|
- String s = args[++i] ;
|
|
|
- if( s.equals("ascending")){
|
|
|
- inputType = GenData.ASCENDING ;
|
|
|
- }else if(s.equals("descending")){
|
|
|
- inputType = GenData.DESCENDING ;
|
|
|
- }else if(s.equals("random")){
|
|
|
- inputType = GenData.RANDOM ;
|
|
|
- }
|
|
|
- }else if(args[i].equals("-ignoreOutput")) {
|
|
|
- ignoreOutput = true ;
|
|
|
- }else if(args[i].equals("-verbose")) {
|
|
|
- verbose = true ;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- File inputFile = File.createTempFile("SortedInput_" +
|
|
|
- new Random().nextInt(),".txt" );
|
|
|
- GenData.generateText(dataLines, inputFile, inputType);
|
|
|
-
|
|
|
- MultiJobRunner runner = new MultiJobRunner(inputFile.getAbsolutePath(),
|
|
|
- output, jarFile );
|
|
|
- runner.setInput(inputFile.getAbsolutePath());
|
|
|
- runner.setNumMaps(numMaps);
|
|
|
- runner.setNumReduces(numReduces);
|
|
|
- runner.setDataLines(dataLines);
|
|
|
- runner.setIgnoreOutput(ignoreOutput);
|
|
|
- runner.setVerbose(verbose);
|
|
|
-
|
|
|
- if( 0 != numJobs ){
|
|
|
- runner.setNumJobs(numJobs);
|
|
|
- }
|
|
|
-
|
|
|
- try{
|
|
|
- runner.run();
|
|
|
- }catch(IOException e){
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-}
|