Преглед изворни кода

MAPREDUCE-805. Fixes some deadlocks in the JobTracker due to the fact the JobTracker lock hierarchy wasn't maintained in some JobInProgress method calls. Contributed by Amar Kamat.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20@803050 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das пре 16 година
родитељ
комит
c4f7be6e23

+ 4 - 0
CHANGES.txt

@@ -205,6 +205,10 @@ Release 0.20.1 - Unreleased
     MAPREDUCE-838. Fixes a problem in the way commit of task outputs
     MAPREDUCE-838. Fixes a problem in the way commit of task outputs
     happens. The bug was that even if commit failed, the task would
     happens. The bug was that even if commit failed, the task would
     be declared as successful. (Amareshwari Sriramadasu via ddas)
     be declared as successful. (Amareshwari Sriramadasu via ddas)
+
+    MAPREDUCE-805. Fixes some deadlocks in the JobTracker due to the fact
+    the JobTracker lock hierarchy wasn't maintained in some JobInProgress
+    method calls. (Amar Kamat via ddas)
  
  
 Release 0.20.0 - 2009-04-15
 Release 0.20.0 - 2009-04-15
 
 

+ 1 - 1
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java

@@ -935,7 +935,7 @@ class CapacityTaskScheduler extends TaskScheduler {
     //Start thread for initialization
     //Start thread for initialization
     if (initializationPoller == null) {
     if (initializationPoller == null) {
       this.initializationPoller = new JobInitializationPoller(
       this.initializationPoller = new JobInitializationPoller(
-          jobQueuesManager,schedConf,queues);
+          jobQueuesManager,schedConf,queues, taskTrackerManager);
     }
     }
     initializationPoller.init(queueManager.getQueues(), schedConf);
     initializationPoller.init(queueManager.getQueues(), schedConf);
     initializationPoller.setDaemon(true);
     initializationPoller.setDaemon(true);

+ 10 - 14
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java

@@ -137,19 +137,12 @@ public class JobInitializationPoller extends Thread {
           LOG.info("Initializing job : " + job.getJobID() + " in Queue "
           LOG.info("Initializing job : " + job.getJobID() + " in Queue "
               + job.getProfile().getQueueName() + " For user : "
               + job.getProfile().getQueueName() + " For user : "
               + job.getProfile().getUser());
               + job.getProfile().getUser());
-          try {
-            if (startIniting) {
-              setInitializingJob(job);
-              job.initTasks();
-              setInitializingJob(null);
-            } else {
-              break;
-            }
-          } catch (Throwable t) {
-            LOG.info("Job initialization failed:\n"
-                + StringUtils.stringifyException(t));
-            jobQueueManager.removeJobFromWaitingQueue(job);
-            job.fail(); 
+          if (startIniting) {
+            setInitializingJob(job);
+            ttm.initJob(job);
+            setInitializingJob(null);
+          } else {
+            break;
           }
           }
         }
         }
       }
       }
@@ -246,6 +239,7 @@ public class JobInitializationPoller extends Thread {
 
 
   private volatile boolean running;
   private volatile boolean running;
 
 
+  private TaskTrackerManager ttm;
   /**
   /**
    * The map which provides information which thread should be used to
    * The map which provides information which thread should be used to
    * initialize jobs for a given job queue.
    * initialize jobs for a given job queue.
@@ -253,13 +247,15 @@ public class JobInitializationPoller extends Thread {
   private HashMap<String, JobInitializationThread> threadsToQueueMap;
   private HashMap<String, JobInitializationThread> threadsToQueueMap;
 
 
   public JobInitializationPoller(JobQueuesManager mgr,
   public JobInitializationPoller(JobQueuesManager mgr,
-      CapacitySchedulerConf rmConf, Set<String> queue) {
+      CapacitySchedulerConf rmConf, Set<String> queue, 
+      TaskTrackerManager ttm) {
     initializedJobs = new HashMap<JobID,JobInProgress>();
     initializedJobs = new HashMap<JobID,JobInProgress>();
     jobQueues = new HashMap<String, QueueInfo>();
     jobQueues = new HashMap<String, QueueInfo>();
     this.jobQueueManager = mgr;
     this.jobQueueManager = mgr;
     threadsToQueueMap = new HashMap<String, JobInitializationThread>();
     threadsToQueueMap = new HashMap<String, JobInitializationThread>();
     super.setName("JobInitializationPollerThread");
     super.setName("JobInitializationPollerThread");
     running = true;
     running = true;
+    this.ttm = ttm;
   }
   }
 
 
   /*
   /*

+ 30 - 24
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -35,6 +35,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 
 
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
@@ -97,8 +98,9 @@ public class TestCapacityScheduler extends TestCase {
     
     
     public ControlledInitializationPoller(JobQueuesManager mgr,
     public ControlledInitializationPoller(JobQueuesManager mgr,
                                           CapacitySchedulerConf rmConf,
                                           CapacitySchedulerConf rmConf,
-                                          Set<String> queues) {
-      super(mgr, rmConf, queues);
+                                          Set<String> queues,
+                                          TaskTrackerManager ttm) {
+      super(mgr, rmConf, queues, ttm);
     }
     }
     
     
     @Override
     @Override
@@ -468,6 +470,27 @@ public class TestCapacityScheduler extends TestCase {
       job.kill();
       job.kill();
     }
     }
 
 
+    @Override
+    public synchronized void failJob(JobInProgress job) {
+      finalizeJob(job, JobStatus.FAILED);
+      job.fail();
+    }
+    
+    public void initJob(JobInProgress jip) {
+      try {
+        JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
+        jip.initTasks();
+        JobStatus newStatus = (JobStatus)jip.getStatus().clone();
+        JobStatusChangeEvent event = new JobStatusChangeEvent(jip, 
+            EventType.RUN_STATE_CHANGED, oldStatus, newStatus);
+        for (JobInProgressListener listener : listeners) {
+          listener.jobUpdated(event);
+        }
+      } catch (Exception ioe) {
+        failJob(jip);
+      }
+    }
+    
     public void removeJob(JobID jobid) {
     public void removeJob(JobID jobid) {
       jobs.remove(jobid);
       jobs.remove(jobid);
     }
     }
@@ -705,7 +728,7 @@ public class TestCapacityScheduler extends TestCase {
     controlledInitializationPoller = new ControlledInitializationPoller(
     controlledInitializationPoller = new ControlledInitializationPoller(
         scheduler.jobQueuesManager,
         scheduler.jobQueuesManager,
         resConf,
         resConf,
-        resConf.getQueues());
+        resConf.getQueues(), taskTrackerManager);
     scheduler.setInitializationPoller(controlledInitializationPoller);
     scheduler.setInitializationPoller(controlledInitializationPoller);
     scheduler.setConf(conf);
     scheduler.setConf(conf);
     //by default disable speculative execution.
     //by default disable speculative execution.
@@ -733,7 +756,7 @@ public class TestCapacityScheduler extends TestCase {
   private FakeJobInProgress submitJobAndInit(int state, JobConf jobConf)
   private FakeJobInProgress submitJobAndInit(int state, JobConf jobConf)
       throws IOException {
       throws IOException {
     FakeJobInProgress j = submitJob(state, jobConf);
     FakeJobInProgress j = submitJob(state, jobConf);
-    scheduler.jobQueuesManager.jobUpdated(initTasksAndReportEvent(j));
+    taskTrackerManager.initJob(j);
     return j;
     return j;
   }
   }
 
 
@@ -753,21 +776,10 @@ public class TestCapacityScheduler extends TestCase {
                                              String queue, String user) 
                                              String queue, String user) 
   throws IOException {
   throws IOException {
     FakeJobInProgress j = submitJob(state, maps, reduces, queue, user);
     FakeJobInProgress j = submitJob(state, maps, reduces, queue, user);
-    scheduler.jobQueuesManager.jobUpdated(initTasksAndReportEvent(j));
+    taskTrackerManager.initJob(j);
     return j;
     return j;
   }
   }
   
   
-  // Note that there is no concept of setup tasks here. So init itself should 
-  // report the job-status change
-  private JobStatusChangeEvent initTasksAndReportEvent(FakeJobInProgress jip) 
-  throws IOException {
-    JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
-    jip.initTasks();
-    JobStatus newStatus = (JobStatus)jip.getStatus().clone();
-    return new JobStatusChangeEvent(jip, EventType.RUN_STATE_CHANGED, 
-                                    oldStatus, newStatus);
-  }
-  
   // test job run-state change
   // test job run-state change
   public void testJobRunStateChange() throws IOException {
   public void testJobRunStateChange() throws IOException {
     // start the scheduler
     // start the scheduler
@@ -794,16 +806,10 @@ public class TestCapacityScheduler extends TestCase {
     // first (may be because of the setup tasks).
     // first (may be because of the setup tasks).
     
     
     // init the lower ranked job first
     // init the lower ranked job first
-    JobChangeEvent event = initTasksAndReportEvent(fjob2);
-    
-    // inform the scheduler
-    scheduler.jobQueuesManager.jobUpdated(event);
+    taskTrackerManager.initJob(fjob2);
     
     
     // init the higher ordered job later
     // init the higher ordered job later
-    event = initTasksAndReportEvent(fjob1);
-    
-    // inform the scheduler
-    scheduler.jobQueuesManager.jobUpdated(event);
+    taskTrackerManager.initJob(fjob1);
     
     
     // check if the jobs are missing from the waiting queue
     // check if the jobs are missing from the waiting queue
     // The jobs are not removed from waiting queue until they are scheduled 
     // The jobs are not removed from waiting queue until they are scheduled 

+ 1 - 0
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java

@@ -113,6 +113,7 @@ public class FairScheduler extends TaskScheduler {
     try {
     try {
       Configuration conf = getConf();
       Configuration conf = getConf();
       this.eagerInitListener = new EagerTaskInitializationListener(conf);
       this.eagerInitListener = new EagerTaskInitializationListener(conf);
+      eagerInitListener.setTaskTrackerManager(taskTrackerManager);
       eagerInitListener.start();
       eagerInitListener.start();
       taskTrackerManager.addJobInProgressListener(eagerInitListener);
       taskTrackerManager.addJobInProgressListener(eagerInitListener);
       taskTrackerManager.addJobInProgressListener(jobListener);
       taskTrackerManager.addJobInProgressListener(jobListener);

+ 8 - 0
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

@@ -172,6 +172,14 @@ public class TestFairScheduler extends TestCase {
       return null;
       return null;
     }
     }
 
 
+    public void initJob (JobInProgress job) {
+      // do nothing
+    }
+    
+    public void failJob (JobInProgress job) {
+      // do nothing
+    }
+    
     // Test methods
     // Test methods
     
     
     public void submitJob(JobInProgress job) throws IOException {
     public void submitJob(JobInProgress job) throws IOException {

+ 7 - 11
src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java

@@ -67,7 +67,7 @@ class EagerTaskInitializationListener extends JobInProgressListener {
     }
     }
   }
   }
   
   
-  static class InitJob implements Runnable {
+  class InitJob implements Runnable {
   
   
     private JobInProgress job;
     private JobInProgress job;
     
     
@@ -76,16 +76,7 @@ class EagerTaskInitializationListener extends JobInProgressListener {
     }
     }
     
     
     public void run() {
     public void run() {
-      try {
-        LOG.info("Initializing " + job.getJobID());
-        job.initTasks();
-      } catch (Throwable t) {
-        LOG.error("Job initialization failed:\n" +
-            StringUtils.stringifyException(t));
-        if (job != null) {
-          job.fail();
-        }
-      }
+      ttm.initJob(job);
     }
     }
   }
   }
   
   
@@ -94,12 +85,17 @@ class EagerTaskInitializationListener extends JobInProgressListener {
   private List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>();
   private List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>();
   private ExecutorService threadPool;
   private ExecutorService threadPool;
   private int numThreads;
   private int numThreads;
+  private TaskTrackerManager ttm;
   
   
   public EagerTaskInitializationListener(Configuration conf) {
   public EagerTaskInitializationListener(Configuration conf) {
     numThreads = conf.getInt("mapred.jobinit.threads", DEFAULT_NUM_THREADS);
     numThreads = conf.getInt("mapred.jobinit.threads", DEFAULT_NUM_THREADS);
     threadPool = Executors.newFixedThreadPool(numThreads);
     threadPool = Executors.newFixedThreadPool(numThreads);
   }
   }
   
   
+  public void setTaskTrackerManager(TaskTrackerManager ttm) {
+    this.ttm = ttm;
+  }
+  
   public void start() throws IOException {
   public void start() throws IOException {
     this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
     this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
     jobInitManagerThread.setDaemon(true);
     jobInitManagerThread.setDaemon(true);

+ 19 - 11
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -54,6 +54,16 @@ import org.apache.hadoop.util.StringUtils;
  * ***********************************************************
  * ***********************************************************
  */
  */
 class JobInProgress {
 class JobInProgress {
+  /**
+   * Used when the a kill is issued to a job which is initializing.
+   */
+  static class KillInterruptedException extends InterruptedException {
+   private static final long serialVersionUID = 1L;
+    public KillInterruptedException(String msg) {
+      super(msg);
+    }
+  }
+
   static final Log LOG = LogFactory.getLog(JobInProgress.class);
   static final Log LOG = LogFactory.getLog(JobInProgress.class);
     
     
   JobProfile profile;
   JobProfile profile;
@@ -377,12 +387,13 @@ class JobInProgress {
    * Construct the splits, etc.  This is invoked from an async
    * Construct the splits, etc.  This is invoked from an async
    * thread so that split-computation doesn't block anyone.
    * thread so that split-computation doesn't block anyone.
    */
    */
-  public synchronized void initTasks() throws IOException {
-    if (tasksInited.get()) {
+  public synchronized void initTasks() 
+  throws IOException, KillInterruptedException {
+    if (tasksInited.get() || isComplete()) {
       return;
       return;
     }
     }
     synchronized(jobInitKillStatus){
     synchronized(jobInitKillStatus){
-      if(jobInitKillStatus.killed) {
+      if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {
         return;
         return;
       }
       }
       jobInitKillStatus.initStarted = true;
       jobInitKillStatus.initStarted = true;
@@ -493,9 +504,7 @@ class JobInProgress {
     synchronized(jobInitKillStatus){
     synchronized(jobInitKillStatus){
       jobInitKillStatus.initDone = true;
       jobInitKillStatus.initDone = true;
       if(jobInitKillStatus.killed) {
       if(jobInitKillStatus.killed) {
-        //setup not launched so directly terminate
-        terminateJob(JobStatus.KILLED);
-        return;
+        throw new KillInterruptedException("Job " + jobId + " killed in init");
       }
       }
     }
     }
     
     
@@ -2199,15 +2208,12 @@ class JobInProgress {
   }
   }
 
 
   /**
   /**
-   * Kill the job and all its component tasks. This method is called from 
+   * Kill the job and all its component tasks. This method should be called from 
    * jobtracker and should return fast as it locks the jobtracker.
    * jobtracker and should return fast as it locks the jobtracker.
    */
    */
   public void kill() {
   public void kill() {
     boolean killNow = false;
     boolean killNow = false;
     synchronized(jobInitKillStatus) {
     synchronized(jobInitKillStatus) {
-      if(jobInitKillStatus.killed) {//job is already marked for killing
-        return;
-      }
       jobInitKillStatus.killed = true;
       jobInitKillStatus.killed = true;
       //if not in middle of init, terminate it now
       //if not in middle of init, terminate it now
       if(!jobInitKillStatus.initStarted || jobInitKillStatus.initDone) {
       if(!jobInitKillStatus.initStarted || jobInitKillStatus.initDone) {
@@ -2221,7 +2227,9 @@ class JobInProgress {
   }
   }
   
   
   /**
   /**
-   * Fails the job and all its component tasks.
+   * Fails the job and all its component tasks. This should be called only from
+   * {@link JobInProgress} or {@link JobTracker}. Look at 
+   * {@link JobTracker#failJob(JobInProgress)} for more details.
    */
    */
   synchronized void fail() {
   synchronized void fail() {
     terminate(JobStatus.FAILED);
     terminate(JobStatus.FAILED);

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java

@@ -47,7 +47,7 @@ class JobQueueTaskScheduler extends TaskScheduler {
   public synchronized void start() throws IOException {
   public synchronized void start() throws IOException {
     super.start();
     super.start();
     taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
     taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
-    
+    eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
     eagerTaskInitializationListener.start();
     eagerTaskInitializationListener.start();
     taskTrackerManager.addJobInProgressListener(
     taskTrackerManager.addJobInProgressListener(
         eagerTaskInitializationListener);
         eagerTaskInitializationListener);

+ 67 - 3
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -60,6 +60,7 @@ import org.apache.hadoop.ipc.RPC.VersionMismatch;
 import org.apache.hadoop.mapred.JobHistory.Keys;
 import org.apache.hadoop.mapred.JobHistory.Keys;
 import org.apache.hadoop.mapred.JobHistory.Listener;
 import org.apache.hadoop.mapred.JobHistory.Listener;
 import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapred.JobHistory.Values;
+import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
@@ -820,11 +821,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
           hasUpdates = true;
           hasUpdates = true;
           LOG.info("Calling init from RM for job " + jip.getJobID().toString());
           LOG.info("Calling init from RM for job " + jip.getJobID().toString());
           try {
           try {
-            jip.initTasks();
+            initJob(jip);
           } catch (Throwable t) {
           } catch (Throwable t) {
             LOG.error("Job initialization failed : \n" 
             LOG.error("Job initialization failed : \n" 
                       + StringUtils.stringifyException(t));
                       + StringUtils.stringifyException(t));
-            jip.fail(); // fail the job
+            failJob(jip);
             throw new IOException(t);
             throw new IOException(t);
           }
           }
         }
         }
@@ -3085,8 +3086,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       return;
       return;
     }
     }
         
         
-    JobStatus prevStatus = (JobStatus)job.getStatus().clone();
     checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
     checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
+    killJob(job);
+  }
+  
+  private synchronized void killJob(JobInProgress job) {
+    LOG.info("Killing job " + job.getJobID());
+    JobStatus prevStatus = (JobStatus)job.getStatus().clone();
     job.kill();
     job.kill();
     
     
     // Inform the listeners if the job is killed
     // Inform the listeners if the job is killed
@@ -3105,6 +3111,64 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     }
     }
   }
   }
 
 
+  public void initJob(JobInProgress job) {
+    if (null == job) {
+      LOG.info("Init on null job is not valid");
+      return;
+    }
+	        
+    try {
+      JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+      LOG.info("Initializing " + job.getJobID());
+      job.initTasks();
+      // Inform the listeners if the job state has changed
+      // Note : that the job will be in PREP state.
+      JobStatus newStatus = (JobStatus)job.getStatus().clone();
+      if (prevStatus.getRunState() != newStatus.getRunState()) {
+        JobStatusChangeEvent event = 
+          new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 
+              newStatus);
+        synchronized (JobTracker.this) {
+          updateJobInProgressListeners(event);
+        }
+      }
+    } catch (KillInterruptedException kie) {
+      //   If job was killed during initialization, job state will be KILLED
+      LOG.error("Job initialization interrupted:\n" +
+          StringUtils.stringifyException(kie));
+      killJob(job);
+    } catch (Throwable t) {
+      // If the job initialization is failed, job state will be FAILED
+      LOG.error("Job initialization failed:\n" +
+          StringUtils.stringifyException(t));
+      failJob(job);
+    }
+	 }
+
+  /**
+   * Fail a job and inform the listeners. Other components in the framework 
+   * should use this to fail a job.
+   */
+  public synchronized void failJob(JobInProgress job) {
+    if (null == job) {
+      LOG.info("Fail on null job is not valid");
+      return;
+    }
+         
+    JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+    LOG.info("Failing job " + job.getJobID());
+    job.fail();
+     
+    // Inform the listeners if the job state has changed
+    JobStatus newStatus = (JobStatus)job.getStatus().clone();
+    if (prevStatus.getRunState() != newStatus.getRunState()) {
+      JobStatusChangeEvent event = 
+        new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 
+            newStatus);
+      updateJobInProgressListeners(event);
+    }
+  }
+  
   /**
   /**
    * Set the priority of a job
    * Set the priority of a job
    * @param jobid id of the job
    * @param jobid id of the job

+ 14 - 0
src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java

@@ -88,4 +88,18 @@ interface TaskTrackerManager {
    * @return jobInProgress object
    * @return jobInProgress object
    */
    */
   public JobInProgress getJob(JobID jobid);
   public JobInProgress getJob(JobID jobid);
+  
+  /**
+   * Initialize the Job
+   * 
+   * @param job JobInProgress object
+   */
+  public void initJob(JobInProgress job);
+  
+  /**
+   * Fail a job.
+   * 
+   * @param job JobInProgress object
+   */
+  public void failJob(JobInProgress job);
 }
 }

+ 1 - 1
src/test/org/apache/hadoop/mapred/MiniMRCluster.java

@@ -500,7 +500,7 @@ public class MiniMRCluster {
    */
    */
   public void initializeJob(JobID jobId) throws IOException {
   public void initializeJob(JobID jobId) throws IOException {
     JobInProgress job = jobTracker.getJobTracker().getJob(jobId);
     JobInProgress job = jobTracker.getJobTracker().getJob(jobId);
-    job.initTasks();
+    jobTracker.getJobTracker().initJob(job);
   }
   }
   
   
   /**
   /**

+ 8 - 0
src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java

@@ -184,6 +184,14 @@ public class TestJobQueueTaskScheduler extends TestCase {
       return null;
       return null;
     }
     }
 
 
+    public void initJob(JobInProgress job) {
+      // do nothing
+    }
+    
+    public void failJob(JobInProgress job) {
+      // do nothing
+    }
+    
     // Test methods
     // Test methods
     
     
     public void submitJob(JobInProgress job) throws IOException {
     public void submitJob(JobInProgress job) throws IOException {

+ 2 - 2
src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java

@@ -479,7 +479,7 @@ public class TestJobTrackerRestart extends TestCase {
     JobID id = job2.getID();*/
     JobID id = job2.getID();*/
     JobInProgress jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
     JobInProgress jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
     
     
-    jip.initTasks();
+    mr.getJobTrackerRunner().getJobTracker().initJob(jip);
     
     
     // find out the history filename
     // find out the history filename
     String history = 
     String history = 
@@ -494,7 +494,7 @@ public class TestJobTrackerRestart extends TestCase {
     id = job1.getID();
     id = job1.getID();
     jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
     jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
     
     
-    jip.initTasks();
+    mr.getJobTrackerRunner().getJobTracker().initJob(jip);
     
     
     //  make sure that cleanup is launched and is waiting
     //  make sure that cleanup is launched and is waiting
     while (!jip.isCleanupLaunched()) {
     while (!jip.isCleanupLaunched()) {

+ 33 - 0
src/test/org/apache/hadoop/mapred/TestParallelInitialization.java

@@ -27,6 +27,8 @@ import java.util.Map;
 import junit.framework.TestCase;
 import junit.framework.TestCase;
 
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 
 
 public class TestParallelInitialization extends TestCase {
 public class TestParallelInitialization extends TestCase {
   
   
@@ -135,8 +137,39 @@ public class TestParallelInitialization extends TestCase {
       return null;
       return null;
     }
     }
 
 
+    public void initJob(JobInProgress job) {
+      try {
+        JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+        job.initTasks();
+        JobStatus newStatus = (JobStatus)job.getStatus().clone();
+        if (prevStatus.getRunState() != newStatus.getRunState()) {
+          JobStatusChangeEvent event = 
+            new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 
+                newStatus);
+          for (JobInProgressListener listener : listeners) {
+            listener.jobUpdated(event);
+          }
+        }
+      } catch (Exception ioe) {
+        failJob(job);
+      }
+    }
     // Test methods
     // Test methods
     
     
+    public synchronized void failJob(JobInProgress job) {
+      JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+      job.fail();
+      JobStatus newStatus = (JobStatus)job.getStatus().clone();
+      if (prevStatus.getRunState() != newStatus.getRunState()) {
+        JobStatusChangeEvent event = 
+          new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 
+              newStatus);
+        for (JobInProgressListener listener : listeners) {
+          listener.jobUpdated(event);
+        }
+      }
+    }
+    
     public void submitJob(JobInProgress job) throws IOException {
     public void submitJob(JobInProgress job) throws IOException {
       for (JobInProgressListener listener : listeners) {
       for (JobInProgressListener listener : listeners) {
         listener.jobAdded(job);
         listener.jobAdded(job);