Browse Source

MAPREDUCE-657. Fix hardcoded filesystem problem in CompletedJobStatusStore. Contributed by Amar Kamat.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20@788666 13f79535-47bb-0310-9956-ffa450edef68
Sharad Agarwal 16 năm trước cách đây
mục cha
commit
b4c83e8f86

+ 3 - 0
CHANGES.txt

@@ -150,6 +150,9 @@ Release 0.20.1 - Unreleased
     MAPREDUCE-130. Delete the jobconf copy from the log directory of the 
     JobTracker when the job is retired. (Amar Kamat via sharad)
 
+    MAPREDUCE-657. Fix hardcoded filesystem problem in CompletedJobStatusStore.
+    (Amar Kamat via sharad)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

+ 8 - 2
src/mapred/org/apache/hadoop/mapred/CompletedJobStatusStore.java

@@ -51,12 +51,11 @@ class CompletedJobStatusStore implements Runnable {
   private static long HOUR = 1000 * 60 * 60;
   private static long SLEEP_TIME = 1 * HOUR;
 
-  CompletedJobStatusStore(Configuration conf, FileSystem fs) throws IOException {
+  CompletedJobStatusStore(Configuration conf) throws IOException {
     active =
       conf.getBoolean("mapred.job.tracker.persist.jobstatus.active", false);
 
     if (active) {
-      this.fs = fs;
       retainTime =
         conf.getInt("mapred.job.tracker.persist.jobstatus.hours", 0) * HOUR;
 
@@ -64,6 +63,9 @@ class CompletedJobStatusStore implements Runnable {
         conf.get("mapred.job.tracker.persist.jobstatus.dir", JOB_INFO_STORE_DIR);
 
       Path path = new Path(jobInfoDir);
+      
+      // set the fs
+      this.fs = path.getFileSystem(conf);
       if (!fs.exists(path)) {
         fs.mkdirs(path);
       }
@@ -72,6 +74,10 @@ class CompletedJobStatusStore implements Runnable {
         // as retain time is zero, all stored jobstatuses are deleted.
         deleteJobStatusDirs();
       }
+      LOG.info("Completed job store activated/configured with retain-time : " 
+               + retainTime + " , job-info-dir : " + jobInfoDir);
+    } else {
+      LOG.info("Completed job store is inactive");
     }
   }
 

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -1711,7 +1711,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
     //initializes the job status store
-    completedJobStatusStore = new CompletedJobStatusStore(conf,fs);
+    completedJobStatusStore = new CompletedJobStatusStore(conf);
   }
 
   private static SimpleDateFormat getDateFormat() {

+ 30 - 0
src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java

@@ -22,11 +22,16 @@ import java.io.OutputStreamWriter;
 import java.io.Writer;
 import java.util.Properties;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 
 public class TestJobStatusPersistency extends ClusterMapReduceTestCase {
+  static final Path TEST_DIR = 
+    new Path(System.getProperty("test.build.data","/tmp"), 
+             "job-status-persistence");
+  
   private JobID runJob() throws Exception {
     OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
     Writer wr = new OutputStreamWriter(os);
@@ -103,4 +108,29 @@ public class TestJobStatusPersistency extends ClusterMapReduceTestCase {
     }
   }
 
+  /**
+   * Test if the completed job status is persisted to localfs.
+   */
+  public void testLocalPersistency() throws Exception {
+    FileSystem fs = FileSystem.getLocal(createJobConf());
+    
+    fs.delete(TEST_DIR, true);
+    
+    Properties config = new Properties();
+    config.setProperty("mapred.job.tracker.persist.jobstatus.active", "true");
+    config.setProperty("mapred.job.tracker.persist.jobstatus.hours", "1");
+    config.setProperty("mapred.job.tracker.persist.jobstatus.dir", 
+                       fs.makeQualified(TEST_DIR).toString());
+    stopCluster();
+    startCluster(false, config);
+    JobID jobId = runJob();
+    JobClient jc = new JobClient(createJobConf());
+    RunningJob rj = jc.getJob(jobId);
+    assertNotNull(rj);
+    
+    // check if the local fs has the data
+    Path jobInfo = new Path(TEST_DIR, rj.getID() + ".info");
+    assertTrue("Missing job info from the local fs", fs.exists(jobInfo));
+    fs.delete(TEST_DIR, true);
+  }
 }