Browse Source

HADOOP-1473. Make job ids unique across jobtracker restarts. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@555383 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 years ago
parent
commit
11637cb95b

+ 3 - 0
CHANGES.txt

@@ -306,6 +306,9 @@ Trunk (unreleased changes)
  94. HADOOP-1584.  Fix a bug in GenericWritable which limited it to
      128 types instead of 256.  (Espen Amble Kolstad via cutting)
 
+ 95. HADOOP-1473.  Make job ids unique across jobtracker restarts.
+     (omalley via cutting)
+
 
 Release 0.13.0 - 2007-06-08
 

+ 16 - 15
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -96,7 +96,7 @@ class JobInProgress {
   boolean tasksInited = false;
 
   private LocalFileSystem localFs;
-  private String uniqueString;
+  private String jobId;
 
   // Per-job counters
   public static enum Counter { 
@@ -116,23 +116,24 @@ class JobInProgress {
    */
   public JobInProgress(String jobFile, JobTracker jobtracker, 
                        Configuration default_conf) throws IOException {
-    uniqueString = jobtracker.createUniqueId();
-    String jobid = "job_" + uniqueString;
-    String url = "http://" + jobtracker.getJobTrackerMachine() + ":" + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;
+    jobId = jobtracker.getTrackerIdentifier() + "_" +jobtracker.createJobId();
+    String fullJobId = "job_" + jobId;
+    String url = "http://" + jobtracker.getJobTrackerMachine() + ":" 
+        + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + fullJobId;
     this.jobtracker = jobtracker;
-    this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
+    this.status = new JobStatus(fullJobId, 0.0f, 0.0f, JobStatus.PREP);
     this.startTime = System.currentTimeMillis();
     this.localFs = (LocalFileSystem)FileSystem.getLocal(default_conf);
 
     JobConf default_job_conf = new JobConf(default_conf);
     this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR 
-                                                      +"/"+jobid + ".xml");
+                                                      +"/"+fullJobId + ".xml");
     this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR
-                                                      +"/"+ jobid + ".jar");
+                                                      +"/"+ fullJobId + ".jar");
     FileSystem fs = FileSystem.get(default_conf);
     fs.copyToLocalFile(new Path(jobFile), localJobFile);
     conf = new JobConf(localJobFile);
-    this.profile = new JobProfile(conf.getUser(), jobid, jobFile, url,
+    this.profile = new JobProfile(conf.getUser(), fullJobId, jobFile, url,
                                   conf.getJobName());
     String jarFile = conf.getJar();
     if (jarFile != null) {
@@ -142,13 +143,13 @@ class JobInProgress {
 
     this.numMapTasks = conf.getNumMapTasks();
     this.numReduceTasks = conf.getNumReduceTasks();
-    this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
-                                                                   numMapTasks + numReduceTasks + 10);
+    this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
+       (numMapTasks + numReduceTasks + 10);
 
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
         
-    JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(), 
+    JobHistory.JobInfo.logSubmitted(fullJobId, conf.getJobName(), conf.getUser(), 
                                     System.currentTimeMillis(), jobFile); 
         
     MetricsContext metricsContext = MetricsUtil.getContext("mapred");
@@ -156,7 +157,7 @@ class JobInProgress {
     this.jobMetrics.setTag("user", conf.getUser());
     this.jobMetrics.setTag("sessionId", conf.getSessionId());
     this.jobMetrics.setTag("jobName", conf.getJobName());
-    this.jobMetrics.setTag("jobId", jobid);
+    this.jobMetrics.setTag("jobId", fullJobId);
   }
 
   /**
@@ -219,7 +220,7 @@ class JobInProgress {
     numMapTasks = splits.length;
     maps = new TaskInProgress[numMapTasks];
     for(int i=0; i < numMapTasks; ++i) {
-      maps[i] = new TaskInProgress(uniqueString, jobFile, 
+      maps[i] = new TaskInProgress(jobId, jobFile, 
                                    splits[i].getClassName(),
                                    splits[i].getBytes(), 
                                    jobtracker, conf, this, i);
@@ -254,7 +255,7 @@ class JobInProgress {
     //
     this.reduces = new TaskInProgress[numReduceTasks];
     for (int i = 0; i < numReduceTasks; i++) {
-      reduces[i] = new TaskInProgress(uniqueString, jobFile, 
+      reduces[i] = new TaskInProgress(jobId, jobFile, 
                                       numMapTasks, i, 
                                       jobtracker, conf, this);
     }
@@ -1071,7 +1072,7 @@ class JobInProgress {
         
       // Delete temp dfs dirs created if any, like in case of 
       // speculative exn of reduces.  
-      String tempDir = conf.get("mapred.system.dir") + "/job_" + uniqueString; 
+      String tempDir = conf.get("mapred.system.dir") + "/job_" + jobId; 
       fs.delete(new Path(tempDir)); 
 
     } catch (IOException e) {

+ 15 - 2
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -21,16 +21,17 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -492,6 +493,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
   ////////////////////////////////////////////////////////////////
   int port;
   String localMachine;
+  private String trackerIdentifier;
   long startTime;
   int totalSubmissions = 0;
 
@@ -651,6 +653,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     this.infoServer.start();
 
     this.startTime = System.currentTimeMillis();
+    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");
+    trackerIdentifier = dateFormat.format(new Date());
 
     myMetrics = new JobTrackerMetrics(jobConf);
     this.expireTrackersThread = new Thread(this.expireTrackers,
@@ -973,6 +977,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
   public String getJobTrackerMachine() {
     return localMachine;
   }
+  
+  /**
+   * Get the unique identifier (ie. timestamp) of this job tracker start.
+   * @return a string with a unique identifier
+   */
+  public String getTrackerIdentifier() {
+    return trackerIdentifier;
+  }
+
   public int getTrackerPort() {
     return port;
   }
@@ -1633,7 +1646,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
   /**
    * Grab random num for job id
    */
-  String createUniqueId() {
+  String createJobId() {
     return idFormat.format(nextJobId++);
   }
 

+ 16 - 6
src/test/org/apache/hadoop/mapred/MRCaching.java

@@ -124,8 +124,18 @@ public class MRCaching {
     }
   }
 
-  public static boolean launchMRCache(String indir,
-                                      String outdir, String cacheDir, JobConf conf, String input)
+  public static class TestResult {
+    public RunningJob job;
+    public boolean isOutputOk;
+    TestResult(RunningJob job, boolean isOutputOk) {
+      this.job = job;
+      this.isOutputOk = isOutputOk;
+    }
+  }
+
+  public static TestResult launchMRCache(String indir,
+                                         String outdir, String cacheDir, 
+                                         JobConf conf, String input)
     throws IOException {
     String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/tmp"))
       .toString().replace(' ', '+');
@@ -197,7 +207,7 @@ public class MRCaching {
     DistributedCache.addCacheArchive(uri1, conf);
     DistributedCache.addCacheArchive(uri2, conf);
     DistributedCache.addCacheFile(uri3, conf);
-    JobClient.runJob(conf);
+    RunningJob job = JobClient.runJob(conf);
     int count = 0;
     // after the job ran check to see if the the input from the localized cache
     // match the real string. check if there are 3 instances or not.
@@ -208,7 +218,7 @@ public class MRCaching {
       String line = file.readLine();
       while (line != null) {
         if (!testStr.equals(line))
-          return false;
+          return new TestResult(job, false);
         count++;
         line = file.readLine();
 
@@ -216,9 +226,9 @@ public class MRCaching {
       file.close();
     }
     if (count != 3)
-      return false;
+      return new TestResult(job, false);
 
-    return true;
+    return new TestResult(job, true);
 
   }
 }

+ 3 - 2
src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java

@@ -22,6 +22,7 @@ import java.io.*;
 import junit.framework.TestCase;
 import org.apache.hadoop.dfs.MiniDFSCluster;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.MRCaching.TestResult;
 
 /**
  * A JUnit test to test caching with DFS
@@ -39,13 +40,13 @@ public class TestMiniMRDFSCaching extends TestCase {
       fileSys = dfs.getFileSystem();
       mr = new MiniMRCluster(2, fileSys.getName(), 4);
       // run the wordcount example with caching
-      boolean ret = MRCaching.launchMRCache("/testing/wc/input",
+      TestResult ret = MRCaching.launchMRCache("/testing/wc/input",
                                             "/testing/wc/output",
                                             "/cachedir",
                                             mr.createJobConf(),
                                             "The quick brown fox\nhas many silly\n"
                                             + "red fox sox\n");
-      assertTrue("Archives not matching", ret);
+      assertTrue("Archives not matching", ret.isOutputOk);
     } finally {
       if (fileSys != null) {
         fileSys.close();

+ 7 - 5
src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.mapred;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.MRCaching.TestResult;
 import org.apache.hadoop.util.Progressable;
 
 import java.io.DataInput;
@@ -52,7 +53,7 @@ public class TestMiniMRLocalFS extends TestCase {
       assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
       // run the wordcount example with caching
       JobConf job = mr.createJobConf();
-      boolean ret = MRCaching.launchMRCache(TEST_ROOT_DIR + "/wc/input",
+      TestResult ret = MRCaching.launchMRCache(TEST_ROOT_DIR + "/wc/input",
                                             TEST_ROOT_DIR + "/wc/output", 
                                             TEST_ROOT_DIR + "/cachedir",
                                             job,
@@ -60,12 +61,13 @@ public class TestMiniMRLocalFS extends TestCase {
                                             + "has many silly\n"
                                             + "red fox sox\n");
       // assert the number of lines read during caching
-      assertTrue("Failed test archives not matching", ret);
+      assertTrue("Failed test archives not matching", ret.isOutputOk);
       // test the task report fetchers
       JobClient client = new JobClient(job);
-      TaskReport[] reports = client.getMapTaskReports("job_0001");
-      assertEquals("number of maps", 10, reports.length);
-      reports = client.getReduceTaskReports("job_0001");
+      String jobid = ret.job.getJobID();
+      TaskReport[] reports = client.getMapTaskReports(jobid);
+      assertEquals("number of maps", 1, reports.length);
+      reports = client.getReduceTaskReports(jobid);
       assertEquals("number of reduces", 1, reports.length);
       runCustomFormats(mr);
     } finally {

+ 21 - 11
src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java

@@ -42,10 +42,18 @@ public class TestMiniMRWithDFS extends TestCase {
   static final int NUM_MAPS = 10;
   static final int NUM_SAMPLES = 100000;
   
-  public static String launchWordCount(JobConf conf,
-                                       String input,
-                                       int numMaps,
-                                       int numReduces) throws IOException {
+  public static class TestResult {
+    public String output;
+    public RunningJob job;
+    TestResult(RunningJob job, String output) {
+      this.job = job;
+      this.output = output;
+    }
+  }
+  public static TestResult launchWordCount(JobConf conf,
+                                           String input,
+                                           int numMaps,
+                                           int numReduces) throws IOException {
     final Path inDir = new Path("/testing/wc/input");
     final Path outDir = new Path("/testing/wc/output");
     FileSystem fs = FileSystem.get(conf);
@@ -73,8 +81,8 @@ public class TestMiniMRWithDFS extends TestCase {
     conf.setOutputPath(outDir);
     conf.setNumMapTasks(numMaps);
     conf.setNumReduceTasks(numReduces);
-    JobClient.runJob(conf);
-    return readOutput(outDir, conf);
+    RunningJob job = JobClient.runJob(conf);
+    return new TestResult(job, readOutput(outDir, conf));
   }
 
   public static String readOutput(Path outDir, 
@@ -167,19 +175,21 @@ public class TestMiniMRWithDFS extends TestCase {
       // Run a word count example
       JobConf jobConf = mr.createJobConf();
       // Keeping tasks that match this pattern
-      jobConf.setKeepTaskFilesPattern("task_[0-9]*_m_000001_.*");
-      String result;
+      jobConf.setKeepTaskFilesPattern("task_[^_]*_[0-9]*_m_000001_.*");
+      TestResult result;
       result = launchWordCount(jobConf, 
                                "The quick brown fox\nhas many silly\n" + 
                                "red fox sox\n",
                                3, 1);
       assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
-                   "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result);
-      checkTaskDirectories(mr, new String[]{"job_0002"}, new String[]{"task_0002_m_000001_0"});
+                   "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output);
+      String jobid = result.job.getJobID();
+      String taskid = "task_" + jobid.substring(4) + "_m_000001_0";
+      checkTaskDirectories(mr, new String[]{jobid}, new String[]{taskid});
       // test with maps=0
       jobConf = mr.createJobConf();
       result = launchWordCount(jobConf, "owen is oom", 0, 1);
-      assertEquals("is\t1\noom\t1\nowen\t1\n", result);
+      assertEquals("is\t1\noom\t1\nowen\t1\n", result.output);
     } finally {
       if (fileSys != null) { fileSys.close(); }
       if (dfs != null) { dfs.shutdown(); }