Browse Source

MAPREDUCE-2535. Fix NPE in JobClient caused by retirement. (Robert Joseph
Evans via cdouglas)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-204@1136709 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 years ago
parent
commit
7a68777fc5

+ 3 - 0
CHANGES.txt

@@ -11,6 +11,9 @@ Release 0.20.204.0 - unreleased
 
   BUG FIXES
 
+    MAPREDUCE-2535. Fix NPE in JobClient caused by retirement. (Robert Joseph
+    Evans via cdouglas)
+
     HDFS-2044. TestQueueProcessingStatistics failing automatic test due to 
     timing issues. (mattf)
 

+ 2 - 3
src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestTaskKillingOfStreamingJob.java

@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.JobClient.NetworkedJob;
 import org.apache.hadoop.mapreduce.test.system.JTProtocol;
 import org.apache.hadoop.mapreduce.test.system.JobInfo;
 import org.apache.hadoop.mapreduce.test.system.MRCluster;
@@ -94,7 +93,7 @@ public class TestTaskKillingOfStreamingJob {
         jtClient.isTaskStarted(taskInfo));
 
     JobInfo jInfo = wovenClient.getJobInfo(jobId); 
-    NetworkedJob networkJob = client.new NetworkedJob(jInfo.getStatus());
+    RunningJob networkJob = client.getJob(jobId);
     TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
     TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
     networkJob.killTask(taskAttID, false);
@@ -153,7 +152,7 @@ public class TestTaskKillingOfStreamingJob {
         jtClient.isTaskStarted(taskInfo));
     
     JobInfo jInfo = wovenClient.getJobInfo(jobId);
-    NetworkedJob networkJob = client.new NetworkedJob(jInfo.getStatus());
+    RunningJob networkJob = client.getJob(jobId);
     TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
     TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
     networkJob.killTask(taskAttID, true);

+ 27 - 11
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -183,7 +183,8 @@ public class JobClient extends Configured implements MRConstants, Tool  {
    * a JobProfile object to provide some info, and interacts with the
    * remote service to provide certain functionality.
    */
-  class NetworkedJob implements RunningJob {
+  static class NetworkedJob implements RunningJob {
+    private JobSubmissionProtocol jobSubmitClient;
     JobProfile profile;
     JobStatus status;
     long statustime;
@@ -191,16 +192,26 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     /**
      * We store a JobProfile and a timestamp for when we last
      * acquired the job profile.  If the job is null, then we cannot
-     * perform any of the tasks.  The job might be null if the JobTracker
-     * has completely forgotten about the job.  (eg, 24 hours after the
-     * job completes.)
+     * perform any of the tasks, so we throw an exception.
+     * The job might be null if the JobTracker has completely forgotten
+     * about the job.  (eg, 24 hours after the job completes.)
      */
-    public NetworkedJob(JobStatus job) throws IOException {
+    public NetworkedJob(JobStatus job, JobProfile prof, JobSubmissionProtocol jobSubmitClient) throws IOException {
       this.status = job;
-      this.profile = jobSubmitClient.getJobProfile(job.getJobID());
+      this.profile = prof;
+      this.jobSubmitClient = jobSubmitClient;
+      if(this.status == null) {
+        throw new IOException("The Job status cannot be null");
+      }
+      if(this.profile == null) {
+        throw new IOException("The Job profile cannot be null");
+      }
+      if(this.jobSubmitClient == null) {
+        throw new IOException("The Job Submission Protocol cannot be null");
+      }
       this.statustime = System.currentTimeMillis();
     }
-
+    
     /**
      * Some methods rely on having a recent job profile object.  Refresh
      * it, if necessary
@@ -217,6 +228,9 @@ public class JobClient extends Configured implements MRConstants, Tool  {
      */
     synchronized void updateStatus() throws IOException {
       this.status = jobSubmitClient.getJobStatus(profile.getJobID());
+      if(this.status == null) {
+        throw new IOException("The job appears to have been removed."); 
+      }
       this.statustime = System.currentTimeMillis();
     }
 
@@ -863,8 +877,9 @@ public class JobClient extends Configured implements MRConstants, Tool  {
           printTokens(jobId, jobCopy.getCredentials());
           status = jobSubmitClient.submitJob(
               jobId, submitJobDir.toString(), jobCopy.getCredentials());
-          if (status != null) {
-            return new NetworkedJob(status);
+          JobProfile prof = jobSubmitClient.getJobProfile(jobId);
+          if (status != null && prof != null) {
+            return new NetworkedJob(status, prof, jobSubmitClient);
           } else {
             throw new IOException("Could not launch job");
           }
@@ -1011,8 +1026,9 @@ public class JobClient extends Configured implements MRConstants, Tool  {
    */
   public RunningJob getJob(JobID jobid) throws IOException {
     JobStatus status = jobSubmitClient.getJobStatus(jobid);
-    if (status != null) {
-      return new NetworkedJob(status);
+    JobProfile profile = jobSubmitClient.getJobProfile(jobid);
+    if (status != null && profile != null) {
+      return new NetworkedJob(status, profile, jobSubmitClient);
     } else {
       return null;
     }

+ 99 - 0
src/test/org/apache/hadoop/mapred/TestNetworkedJob.java

@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+import static org.mockito.Mockito.*;
+
+public class TestNetworkedJob {
+
+  @Test(expected=IOException.class)
+  public void testNullStatus() throws Exception {
+    JobProfile mockProf = mock(JobProfile.class);
+    JobSubmissionProtocol mockClient = mock(JobSubmissionProtocol.class);
+    new JobClient.NetworkedJob(null, mockProf, mockClient);
+  }
+  
+  @Test(expected=IOException.class)
+  public void testNullProfile() throws Exception {
+    JobStatus mockStatus = mock(JobStatus.class);
+    JobSubmissionProtocol mockClient = mock(JobSubmissionProtocol.class);
+    new JobClient.NetworkedJob(mockStatus, null, mockClient);
+  }
+
+  @Test(expected=IOException.class)
+  public void testNullClient() throws Exception {
+    JobStatus mockStatus = mock(JobStatus.class);
+    JobProfile mockProf = mock(JobProfile.class);
+    new JobClient.NetworkedJob(mockStatus, mockProf, null);
+  }
+  
+
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testBadUpdate() throws Exception {
+    JobStatus mockStatus = mock(JobStatus.class);
+    JobProfile mockProf = mock(JobProfile.class);
+    JobSubmissionProtocol mockClient = mock(JobSubmissionProtocol.class);
+    
+    JobID id = new JobID("test",0);
+    
+    RunningJob rj = new JobClient.NetworkedJob(mockStatus, mockProf, mockClient);
+    
+    when(mockProf.getJobID()).thenReturn(id);
+    when(mockClient.getJobStatus(id)).thenReturn(null);
+    
+    boolean caught = false;
+    try {
+      rj.isSuccessful();
+    } catch(IOException e) {
+      caught = true;
+    }
+    assertTrue("Expected updateStatus to throw an IOException bt it did not", caught);
+    
+    //verification
+    verify(mockProf).getJobID();
+    verify(mockClient).getJobStatus(id);
+  }
+
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testGetNullCounters() throws Exception {
+    JobStatus mockStatus = mock(JobStatus.class);
+    JobProfile mockProf = mock(JobProfile.class);
+    JobSubmissionProtocol mockClient = mock(JobSubmissionProtocol.class);
+    RunningJob underTest =
+      new JobClient.NetworkedJob(mockStatus, mockProf, mockClient); 
+
+    JobID id = new JobID("test", 0);
+    when(mockProf.getJobID()).thenReturn(id);
+    when(mockClient.getJobCounters(id)).thenReturn(null);
+    assertNull(underTest.getCounters());
+    //verification
+    verify(mockClient).getJobCounters(id);
+  }
+
+}

+ 2 - 3
src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java

@@ -28,7 +28,6 @@ import org.apache.hadoop.mapreduce.test.system.TTClient;
 import org.apache.hadoop.mapreduce.test.system.JTProtocol;
 import org.apache.hadoop.mapreduce.test.system.TaskInfo;
 import org.apache.hadoop.mapreduce.test.system.JobInfo;
-import org.apache.hadoop.mapred.JobClient.NetworkedJob;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -189,8 +188,8 @@ public class TestJobCacheDirectoriesCleanUp {
           int MAX_MAP_TASK_ATTEMPTS = Integer.
                parseInt(jobConf.get("mapred.map.max.attempts"));
           while(taskinfo.numFailedAttempts() < MAX_MAP_TASK_ATTEMPTS) {
-            NetworkedJob networkJob = jtClient.getClient().
-               new NetworkedJob(jobInfo.getStatus());
+            org.apache.hadoop.mapreduce.JobID temp = jobInfo.getID();
+            RunningJob networkJob = client.getJob(new JobID(temp.getJtIdentifier(), temp.getId()));
             networkJob.killTask(taskAttID, true);
             taskinfo = rtClient.getTaskInfo(taskinfo.getTaskID());
             taskAttID = new TaskAttemptID(taskId, taskinfo.numFailedAttempts());

+ 1 - 3
src/test/system/java/org/apache/hadoop/mapred/TestTaskChildsKilling.java

@@ -35,7 +35,6 @@ import org.apache.hadoop.mapreduce.test.system.TTProtocol;
 import org.apache.hadoop.mapreduce.test.system.TTTaskInfo;
 import org.apache.hadoop.mapreduce.test.system.TaskInfo;
 import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
-import org.apache.hadoop.mapred.JobClient.NetworkedJob;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.util.Tool;
 import org.junit.AfterClass;
@@ -271,8 +270,7 @@ public class TestTaskChildsKilling {
     Assert.assertTrue("Map process is not alive before task kills.", 
         ttIns.isProcessTreeAlive(pid));
 
-    NetworkedJob networkJob = client.new NetworkedJob(jInfo.getStatus());
-    networkJob.killTask(taskAttID, false);
+    runJob.killTask(taskAttID, false);
 
     LOG.info("Waiting till the task is killed...");
     taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());

+ 9 - 11
src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java

@@ -34,7 +34,6 @@ import org.apache.hadoop.mapreduce.test.system.TaskInfo;
 import org.apache.hadoop.mapreduce.test.system.TTClient;
 import org.apache.hadoop.mapreduce.test.system.JTClient;
 import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
-import org.apache.hadoop.mapred.JobClient.NetworkedJob;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
@@ -100,7 +99,7 @@ public class TestTaskKilling {
         jtClient.isTaskStarted(taskInfo));
 
     // Fail the running task.
-    NetworkedJob networkJob = jobClient.new NetworkedJob(jInfo.getStatus());
+    RunningJob networkJob = jobClient.getJob(jobId);
     TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
     TaskAttemptID taskAttID = new TaskAttemptID(tID , 0);
     networkJob.killTask(taskAttID, true);
@@ -206,7 +205,7 @@ public class TestTaskKilling {
             taskAttID + " has not been found while task was running.", 
                     isTempFolderExists);
     
-    NetworkedJob networkJob = jobClient.new NetworkedJob(jInfo.getStatus());
+    RunningJob networkJob = jobClient.getJob(id);
     networkJob.killTask(taskAttID, false);
     ttClient.getProxy().sendAction(action);
     taskInfo = remoteJTClient.getTaskInfo(tID);
@@ -353,8 +352,7 @@ public class TestTaskKilling {
         TaskAttemptID tAttID = new TaskAttemptID(taskId, 
             taskInfo.numFailedAttempts());
         while(taskInfo.numFailedAttempts() < MAX_MAP_TASK_ATTEMPTS) {
-          NetworkedJob networkJob = jtClient.getClient().
-             new NetworkedJob(jobInfo.getStatus());
+          RunningJob networkJob = jobClient.getJob(id);
           networkJob.killTask(taskAttID, true);
           taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
           taskAttID = new TaskAttemptID(taskId, taskInfo.numFailedAttempts());
@@ -484,8 +482,7 @@ public class TestTaskKilling {
             taskIdKilled = taskid.toString();
             taskAttemptID = new TaskAttemptID(taskid, i);
             LOG.info("taskAttemptid going to be killed is : " + taskAttemptID);
-            (jobClient.new NetworkedJob(jInfo.getStatus())).
-                killTask(taskAttemptID,true);
+            rJob.killTask(taskAttemptID,true);
             checkTaskCompletionEvent(taskAttemptID, jInfo);
             break;
           } else {
@@ -495,8 +492,7 @@ public class TestTaskKilling {
               UtilsForTests.waitFor(20000);
               LOG.info("taskAttemptid going to be killed is : " +
                   taskAttemptID);
-              (jobClient.new NetworkedJob(jInfo.getStatus())).
-                  killTask(taskAttemptID,true);
+              rJob.killTask(taskAttemptID,true);
               checkTaskCompletionEvent(taskAttemptID,jInfo);
               break;
             }
@@ -536,8 +532,10 @@ public class TestTaskKilling {
     boolean match = false;
     int count = 0;
     while (!match) {
-      TaskCompletionEvent[] taskCompletionEvents =  jobClient.new
-        NetworkedJob(jInfo.getStatus()).getTaskCompletionEvents(0);
+      org.apache.hadoop.mapreduce.JobID temp = jInfo.getID();
+      RunningJob rJob = jobClient.getJob(new JobID(temp.getJtIdentifier(), temp.getId()));
+ 
+      TaskCompletionEvent[] taskCompletionEvents =  rJob.getTaskCompletionEvents(0);
       for (TaskCompletionEvent taskCompletionEvent : taskCompletionEvents) {
         LOG.info("taskCompletionEvent.getTaskAttemptId().toString() is : " + 
           taskCompletionEvent.getTaskAttemptId().toString());

+ 1 - 4
src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoSuccessfulFailedJobs.java

@@ -30,7 +30,6 @@ import org.apache.hadoop.mapreduce.test.system.MRCluster;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.UtilsForTests;
-import org.apache.hadoop.mapred.JobClient.NetworkedJob;
 import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -340,11 +339,9 @@ public class TestTaskTrackerInfoSuccessfulFailedJobs {
     Assert.assertTrue("Task has not been started for 1 min.", 
       count != 60);
 
-    NetworkedJob networkJob = (cluster.getJTClient().getClient()).new 
-      NetworkedJob(jInfo.getStatus());
     TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
     TaskAttemptID taskAttID = new TaskAttemptID(tID , 0);
-    networkJob.killTask(taskAttID, false);
+    rJob.killTask(taskAttID, false);
 
     count = 0;
     LOG.info("Waiting till the job is completed...");

+ 4 - 2
src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java

@@ -124,9 +124,11 @@ public abstract class HadoopDaemonRemoteCluster
         }
       } finally {
         try {
-          reader.close();
+          if(reader != null) {
+              reader.close();
+          }
         } catch (IOException e) {
-          LOG.warn("Could not close reader");
+          LOG.warn("Could not close reader", e);
         }
       }
       LOG.info("Created HadoopDaemonInfo for " + cmd + " " + role + " from "