Bläddra i källkod

HADOOP-3135. Get the system directory from the JobTracker instead of from the conf. Contributed by Subramaniam Krishnan.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@663934 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 år sedan
förälder
incheckning
90e3316002

+ 3 - 0
CHANGES.txt

@@ -492,6 +492,9 @@ Trunk (unreleased changes)
     HADOOP-3496.  Fix failure in TestHarFileSystem.testArchives due to change
     in HADOOP-3095.  (tomwhite)
 
+    HADOOP-3135. Get the system directory from the JobTracker instead of from
+    the conf. (Subramaniam Krishnan via ddas)
+
 Release 0.17.0 - 2008-05-18
 
   INCOMPATIBLE CHANGES

+ 2 - 2
src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java

@@ -51,12 +51,12 @@ class CompletedJobStatusStore implements Runnable {
   private static long HOUR = 1000 * 60 * 60;
   private static long SLEEP_TIME = 1 * HOUR;
 
-  CompletedJobStatusStore(Configuration conf) throws IOException {
+  CompletedJobStatusStore(Configuration conf, FileSystem fs) throws IOException {
     active =
       conf.getBoolean("mapred.job.tracker.persist.jobstatus.active", false);
 
     if (active) {
-      fs = FileSystem.get(conf);
+      this.fs = fs;
       retainTime =
         conf.getInt("mapred.job.tracker.persist.jobstatus.hours", 0) * HOUR;
 

+ 11 - 5
src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java

@@ -43,8 +43,9 @@ interface InterTrackerProtocol extends VersionedProtocol {
    * version 11 changes string to JobID in getTaskCompletionEvents().
    * version 12 changes the counters representation for HADOOP-1915
    * version 13 added call getBuildVersion() for HADOOP-236
+   * Version 14: replaced getFilesystemName with getSystemDir for HADOOP-3135
    */
-  public static final long versionID = 13L;
+  public static final long versionID = 14L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;
@@ -73,13 +74,13 @@ interface InterTrackerProtocol extends VersionedProtocol {
   HeartbeatResponse heartbeat(TaskTrackerStatus status, 
                               boolean initialContact, boolean acceptNewTasks, short responseId)
     throws IOException;
-
+  
   /**
    * The task tracker calls this once, to discern where it can find
    * files referred to by the JobTracker
    */
   public String getFilesystemName() throws IOException;
-  
+
   /**
    * Report a problem to the job tracker.
    * @param taskTracker the name of the task tracker
@@ -102,6 +103,13 @@ interface InterTrackerProtocol extends VersionedProtocol {
    */
   TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId
       , int maxEvents) throws IOException;
+
+  /**
+   * Grab the jobtracker system directory path where job-specific files are to be placed.
+   * 
+   * @return the system directory where job-specific files are to be placed.
+   */
+  public String getSystemDir();
   
   
   /**
@@ -109,5 +117,3 @@ interface InterTrackerProtocol extends VersionedProtocol {
    */
   public String getBuildVersion() throws IOException;
 }
-
-

+ 36 - 3
src/java/org/apache/hadoop/mapred/JobClient.java

@@ -334,6 +334,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
   }
 
   JobSubmissionProtocol jobSubmitClient;
+  Path sysDir = null;
   
   FileSystem fs = null;
 
@@ -406,8 +407,8 @@ public class JobClient extends Configured implements MRConstants, Tool  {
    */
   public synchronized FileSystem getFs() throws IOException {
     if (this.fs == null) {
-      String fsName = jobSubmitClient.getFilesystemName();
-      this.fs = FileSystem.getNamed(fsName, getConf());
+      Path sysDir = getSystemDir();
+      this.fs = sysDir.getFileSystem(getConf());
     }
     return fs;
   }
@@ -664,7 +665,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
      */
     
     JobID jobId = jobSubmitClient.getNewJobId();
-    Path submitJobDir = new Path(job.getSystemDir(), jobId.toString());
+    Path submitJobDir = new Path(getSystemDir(), jobId.toString());
     Path submitJarFile = new Path(submitJobDir, "job.jar");
     Path submitSplitFile = new Path(submitJobDir, "job.split");
     configureCommandLineOptions(job, submitJobDir, submitJarFile);
@@ -1419,6 +1420,38 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     }
   }
 
+  /**
+   * Get status information about the max available Maps in the cluster.
+   *  
+   * @return the max available Maps in the cluster
+   * @throws IOException
+   */
+  public int getDefaultMaps() throws IOException {
+    return getClusterStatus().getMaxMapTasks();
+  }
+
+  /**
+   * Get status information about the max available Reduces in the cluster.
+   *  
+   * @return the max available Reduces in the cluster
+   * @throws IOException
+   */
+  public int getDefaultReduces() throws IOException {
+    return getClusterStatus().getMaxReduceTasks();
+  }
+
+  /**
+   * Grab the jobtracker system directory path where job-specific files are to be placed.
+   * 
+   * @return the system directory where job-specific files are to be placed.
+   */
+  public Path getSystemDir() {
+    if (sysDir == null) {
+      sysDir = new Path(jobSubmitClient.getSystemDir());
+    }
+    return sysDir;
+  }
+
   /**
    */
   public static void main(String argv[]) throws Exception {

+ 2 - 0
src/java/org/apache/hadoop/mapred/JobConf.java

@@ -186,10 +186,12 @@ public class JobConf extends Configuration {
   }
 
   /**
+   * @deprecated Use {@link JobClient#getSystemDir()} instead.
    * Get the system directory where job-specific files are to be placed.
    * 
    * @return the system directory where job-specific files are to be placed.
    */
+  @Deprecated
   public Path getSystemDir() {
     return new Path(get("mapred.system.dir", "/tmp/hadoop/mapred/system"));
   }

+ 7 - 7
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -172,8 +172,9 @@ class JobInProgress {
                                                       +"/"+jobid + ".xml");
     this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR
                                                       +"/"+ jobid + ".jar");
-    FileSystem fs = FileSystem.get(default_conf);
-    Path jobFile = new Path(default_conf.getSystemDir(), jobid + "/job.xml");
+    Path sysDir = new Path(this.jobtracker.getSystemDir());
+    FileSystem fs = sysDir.getFileSystem(default_conf);
+    Path jobFile = new Path(sysDir, jobid + "/job.xml");
     fs.copyToLocalFile(jobFile, localJobFile);
     conf = new JobConf(localJobFile);
     this.priority = conf.getJobPriority();
@@ -302,7 +303,8 @@ class JobInProgress {
     //
     String jobFile = profile.getJobFile();
 
-    FileSystem fs = FileSystem.get(conf);
+    Path sysDir = new Path(this.jobtracker.getSystemDir());
+    FileSystem fs = sysDir.getFileSystem(conf);
     DataInputStream splitFile =
       fs.open(new Path(conf.get("mapred.job.split.file")));
     JobClient.RawSplit[] splits;
@@ -1623,12 +1625,10 @@ class JobInProgress {
 
       // JobClient always creates a new directory with job files
       // so we remove that directory to cleanup
-      FileSystem fs = FileSystem.get(conf);
-      fs.delete(new Path(profile.getJobFile()).getParent(), true);
-        
       // Delete temp dfs dirs created if any, like in case of 
       // speculative exn of reduces.  
-      Path tempDir = new Path(conf.getSystemDir(), jobId.toString()); 
+      Path tempDir = new Path(jobtracker.getSystemDir(), jobId.toString());
+      FileSystem fs = tempDir.getFileSystem(conf);
       fs.delete(tempDir, true); 
 
       // delete the temporary directory in output directory

+ 11 - 3
src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java

@@ -39,8 +39,9 @@ interface JobSubmissionProtocol extends VersionedProtocol {
    * Version 7: added getAllJobs for HADOOP-2487
    * Version 8: change {job|task}id's to use corresponding objects rather that strings.
    * Version 9: change the counter representation for HADOOP-1915
+   * Version 10: added getSystemDir for HADOOP-3135
    */
-  public static final long versionID = 9L;
+  public static final long versionID = 10L;
 
   /**
    * Allocate a name for the job.
@@ -141,6 +142,13 @@ interface JobSubmissionProtocol extends VersionedProtocol {
    * @return an array of the diagnostic messages
    */
   public String[] getTaskDiagnostics(TaskAttemptID taskId) 
-  throws IOException;  
-  
+  throws IOException;
+
+  /**
+   * Grab the jobtracker system directory path where job-specific files are to be placed.
+   * 
+   * @return the system directory where job-specific files are to be placed.
+   */
+  public String getSystemDir();  
+
 }

+ 14 - 3
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -698,8 +698,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     this.conf.set("mapred.job.tracker.http.address", 
         infoBindAddress + ":" + this.infoPort); 
     LOG.info("JobTracker webserver: " + this.infoServer.getPort());
-    this.systemDir = jobConf.getSystemDir();
-
+    
     while (true) {
       try {
         // if we haven't contacted the namenode go ahead and do it
@@ -708,6 +707,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         }
         // clean up the system dir, which will only work if hdfs is out of 
         // safe mode
+        if(systemDir == null) {
+          systemDir = new Path(getSystemDir());    
+        }
         fs.delete(systemDir, true);
         if (FileSystem.mkdirs(fs, systemDir, 
             new FsPermission(SYSTEM_DIR_PERMISSION))) {
@@ -747,7 +749,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     }
 
     //initializes the job status store
-    completedJobStatusStore = new CompletedJobStatusStore(conf);
+    completedJobStatusStore = new CompletedJobStatusStore(conf,fs);
 
     LOG.info("Starting RUNNING");
   }
@@ -1990,6 +1992,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     return v.toArray(new JobStatus[v.size()]);
   }
     
+  /**
+   * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
+   */
+  public String getSystemDir() {
+    Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));  
+    return fs.makeQualified(sysDir).toString();
+  }
+  
   ///////////////////////////////////////////////////////////////
   // JobTracker methods
   ///////////////////////////////////////////////////////////////
@@ -2322,4 +2332,5 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
       System.exit(-1);
     }
   }
+
 }

+ 12 - 1
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -75,7 +75,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
     }
     
     public Job(JobID jobid, JobConf conf) throws IOException {
-      this.file = new Path(conf.getSystemDir(), jobid + "/job.xml");
+      this.file = new Path(getSystemDir(), jobid + "/job.xml");
       this.id = jobid;
       this.mapoutputFile = new MapOutputFile(jobid);
       this.mapoutputFile.setConf(conf);
@@ -149,6 +149,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
                                    + " doesnt exist " );
             }
           }
+          map.setJobFile(localFile.toString());
           map.localizeConfiguration(localConf);
           map.setConf(localConf);
           map_tasks += 1;
@@ -192,6 +193,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
                                        + " doesnt exist ");
                 }
               }
+              reduce.setJobFile(localFile.toString());
               reduce.localizeConfiguration(localConf);
               reduce.setConf(localConf);
               reduce_tasks += 1;
@@ -387,4 +389,13 @@ class LocalJobRunner implements JobSubmissionProtocol {
   		throws IOException{
 	  return new String [0];
   }
+
+  /**
+   * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
+   */
+  public String getSystemDir() {
+    Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));  
+    return fs.makeQualified(sysDir).toString();
+  }
+
 }

+ 6 - 3
src/java/org/apache/hadoop/mapred/MultiFileSplit.java

@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.BlockLocation;
@@ -89,10 +90,12 @@ public class MultiFileSplit implements InputSplit {
 
   public String[] getLocations() throws IOException {
     HashSet<String> hostSet = new HashSet<String>();
+    JobClient jClient = new JobClient(job);
+    FileSystem fs = jClient.getFs();
     for (Path file : paths) {
-      BlockLocation[] blkLocations = FileSystem.get(job)
-        .getFileBlockLocations(file, 0, FileSystem.get(job)
-        .getFileStatus(file).getLen());
+      FileStatus status = fs.getFileStatus(file);
+      BlockLocation[] blkLocations = fs.getFileBlockLocations(status,
+                                          0, status.getLen());
       if (blkLocations != null && blkLocations.length > 0) {
         addToSet(hostSet, blkLocations[0].getHosts());
       }

+ 4 - 3
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -641,9 +641,10 @@ public class TaskTracker
     
     JobID jobId = t.getJobID();
     Path jobFile = new Path(t.getJobFile());
-    // Get size of JobFile.
-    // size is -1 if not present.
-    FileSystem fs = FileSystem.getNamed(jobClient.getFilesystemName(),fConf);
+    // Get sizes of JobFile and JarFile
+    // sizes are -1 if they are not present.
+    Path systemDir = new Path(jobClient.getSystemDir());
+    FileSystem fs = systemDir.getFileSystem(fConf);
     FileStatus status = null;
     long jobFileSize = -1;
     try {

+ 8 - 2
src/java/org/apache/hadoop/util/CopyFiles.java

@@ -23,7 +23,12 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Stack;
+import java.util.StringTokenizer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -884,7 +889,8 @@ public class CopyFiles implements Tool {
     final boolean updateORoverwrite = setBooleans(jobConf, flags);
 
     final String randomId = getRandomId();
-    Path jobDirectory = new Path(jobConf.getSystemDir(), NAME + "_" + randomId);
+    JobClient jClient = new JobClient(jobConf);
+    Path jobDirectory = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
     jobConf.set(JOB_DIR_LABEL, jobDirectory.toString());
 
     FileSystem dstfs = destPath.getFileSystem(conf);

+ 2 - 5
src/java/org/apache/hadoop/util/HadoopArchives.java

@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.util;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 
@@ -54,11 +52,9 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
 
 
@@ -315,7 +311,8 @@ public class HadoopArchives implements Tool {
     FileOutputFormat.setOutputPath(conf, outputPath);
     conf.set(DST_DIR_LABEL, outputPath.toString());
     final String randomId = CopyFiles.getRandomId();
-    Path jobDirectory = new Path(conf.getSystemDir(), NAME + "_" + randomId);
+    Path jobDirectory = new Path(new JobClient().getSystemDir(),
+                          NAME + "_" + randomId);
     conf.set(JOB_DIR_LABEL, jobDirectory.toString());
     //get a tmp directory for input splits
     FileSystem jobfs = jobDirectory.getFileSystem(conf);

+ 3 - 18
src/test/org/apache/hadoop/mapred/GenericMRLoadGenerator.java

@@ -38,22 +38,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileRecordReader;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -151,9 +135,10 @@ public class GenericMRLoadGenerator extends Configured implements Tool {
       confRandom(job);
     } else if (null != job.getClass("mapred.indirect.input.format", null)) {
       // specified IndirectInputFormat? Build src list
-      Path sysdir = job.getSystemDir();
+      JobClient jClient = new JobClient(job);  
+      Path sysdir = jClient.getSystemDir();
       Random r = new Random();
-      Path indirInputFile = new Path(job.getSystemDir(),
+      Path indirInputFile = new Path(sysdir,
           Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
       job.set("mapred.indirect.input.file", indirInputFile.toString());
       SequenceFile.Writer writer = SequenceFile.createWriter(