Bladeren bron

MAPREDUCE-2618. Fix NPE in 0 map 0 reduce jobs. (Jeffrey Naisbitt via llu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-279@1140266 13f79535-47bb-0310-9956-ffa450edef68
Luke Lu 14 jaren geleden
bovenliggende
commit
98e14e4ece

+ 2 - 0
mapreduce/CHANGES.txt

@@ -5,6 +5,8 @@ Trunk (unreleased changes)
 
 
     MAPREDUCE-279
     MAPREDUCE-279
 
 
+    MAPREDUCE-2618. Fix NPE in 0 map 0 reduce jobs. (Jeffrey Naisbitt via llu)
+
     Fix some invalid transitions in the RM. (vinodkv via ddas)
     Fix some invalid transitions in the RM. (vinodkv via ddas)
     
     
     Fix diagnostics display for more than 100 apps in RM. (llu)
     Fix diagnostics display for more than 100 apps in RM. (llu)

+ 3 - 0
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java

@@ -35,6 +35,9 @@ public enum JobEventType {
   JOB_MAP_TASK_RESCHEDULED,
   JOB_MAP_TASK_RESCHEDULED,
   JOB_TASK_ATTEMPT_COMPLETED,
   JOB_TASK_ATTEMPT_COMPLETED,
 
 
+  //Producer:Job
+  JOB_COMPLETED,
+
   //Producer:Any component
   //Producer:Any component
   JOB_DIAGNOSTIC_UPDATE,
   JOB_DIAGNOSTIC_UPDATE,
   INTERNAL_ERROR,
   INTERNAL_ERROR,

+ 68 - 17
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -230,6 +230,11 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
               EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED),
               EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED),
               JobEventType.JOB_TASK_COMPLETED,
               JobEventType.JOB_TASK_COMPLETED,
               new TaskCompletedTransition())
               new TaskCompletedTransition())
+          .addTransition
+              (JobState.RUNNING,
+              EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED),
+              JobEventType.JOB_COMPLETED,
+              new JobNoTasksCompletedTransition())
           .addTransition(JobState.RUNNING, JobState.KILL_WAIT,
           .addTransition(JobState.RUNNING, JobState.KILL_WAIT,
               JobEventType.JOB_KILL, new KillTasksTransition())
               JobEventType.JOB_KILL, new KillTasksTransition())
           .addTransition(JobState.RUNNING, JobState.RUNNING,
           .addTransition(JobState.RUNNING, JobState.RUNNING,
@@ -393,6 +398,19 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     return jobId;
     return jobId;
   }
   }
 
 
+  // Getter methods that make unit testing easier (package-scoped)
+  OutputCommitter getCommitter() {
+    return this.committer;
+  }
+
+  EventHandler getEventHandler() {
+    return this.eventHandler;
+  }
+
+  JobContext getJobContext() {
+    return this.jobContext;
+  }
+
   @Override
   @Override
   public boolean checkAccess(UserGroupInformation callerUGI, 
   public boolean checkAccess(UserGroupInformation callerUGI, 
       JobACL jobOperation) {
       JobACL jobOperation) {
@@ -687,11 +705,34 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     metrics.waitingTask(task);
     metrics.waitingTask(task);
   }
   }
 
 
-  private void setFinishTime() {
+  void setFinishTime() {
     finishTime = clock.getTime();
     finishTime = clock.getTime();
   }
   }
+
+  void logJobHistoryFinishedEvent() {
+    this.setFinishTime();
+    JobFinishedEvent jfe = createJobFinishedEvent(this);
+    LOG.info("Calling handler for JobFinishedEvent ");
+    this.getEventHandler().handle(new JobHistoryEvent(this.jobId, jfe));    
+  }
   
   
-  private JobState finished(JobState finalState) {
+  static JobState checkJobCompleteSuccess(JobImpl job) {
+    // check for Job success
+    if (job.completedTaskCount == job.getTasks().size()) {
+      try {
+        // Commit job & do cleanup
+        job.getCommitter().commitJob(job.getJobContext());
+      } catch (IOException e) {
+        LOG.warn("Could not do commit for Job", e);
+      }
+      
+      job.logJobHistoryFinishedEvent();
+      return job.finished(JobState.SUCCEEDED);
+    }
+    return null;
+  }
+
+  JobState finished(JobState finalState) {
     if (getState() == JobState.RUNNING) {
     if (getState() == JobState.RUNNING) {
       metrics.endRunningJob(this);
       metrics.endRunningJob(this);
     }
     }
@@ -759,10 +800,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
         TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
         TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
         job.numMapTasks = taskSplitMetaInfo.length;
         job.numMapTasks = taskSplitMetaInfo.length;
         job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
         job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
-        
+
         if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
         if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
           job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
           job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
-          return job.finished(JobState.FAILED);
         }
         }
 
 
         checkTaskLimits();
         checkTaskLimits();
@@ -1064,6 +1104,11 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           job.submitTime, job.startTime);
           job.submitTime, job.startTime);
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
       job.metrics.runningJob(job);
       job.metrics.runningJob(job);
+
+			// If we have no tasks, just transition to job completed
+      if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
+        job.eventHandler.handle(new JobEvent(job.jobId, JobEventType.JOB_COMPLETED));
+      }
     }
     }
   }
   }
 
 
@@ -1237,19 +1282,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
         return job.finished(JobState.FAILED);
         return job.finished(JobState.FAILED);
       }
       }
       
       
-      //check for Job success
-      if (job.completedTaskCount == job.tasks.size()) {
-        try {
-          job.committer.commitJob(job.jobContext);
-        } catch (IOException e) {
-          LOG.warn("Could not do commit for Job", e);
-        }
-       // Log job-history
-        job.setFinishTime();
-        JobFinishedEvent jfe = createJobFinishedEvent(job);
-        LOG.info("Calling handler for JobFinishedEvent ");
-        job.eventHandler.handle(new JobHistoryEvent(job.jobId, jfe));
-        return job.finished(JobState.SUCCEEDED);
+      JobState jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
+      if (jobCompleteSuccess != null) {
+        return jobCompleteSuccess;
       }
       }
       
       
       //return the current state, Job not finished yet
       //return the current state, Job not finished yet
@@ -1285,6 +1320,22 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     }
     }
   }
   }
 
 
+  // Transition class for handling jobs with no tasks
+  static class JobNoTasksCompletedTransition implements
+  MultipleArcTransition<JobImpl, JobEvent, JobState> {
+
+    @Override
+    public JobState transition(JobImpl job, JobEvent event) {
+      JobState jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
+      if (jobCompleteSuccess != null) {
+        return jobCompleteSuccess;
+      }
+      
+      // Return the current state, Job not finished yet
+      return job.getState();
+    }
+  }
+
   private static class MapTaskRescheduledTransition implements
   private static class MapTaskRescheduledTransition implements
       SingleArcTransition<JobImpl, JobEvent> {
       SingleArcTransition<JobImpl, JobEvent> {
     @Override
     @Override

+ 2 - 2
mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java

@@ -59,10 +59,10 @@ public class TestMRApp {
   }
   }
   
   
   @Test
   @Test
-  public void testZeroMapReduces() throws Exception {
+  public void testZeroMapReduces() throws Exception{
     MRApp app = new MRApp(0, 0, true, this.getClass().getName(), true);
     MRApp app = new MRApp(0, 0, true, this.getClass().getName(), true);
     Job job = app.submit(new Configuration());
     Job job = app.submit(new Configuration());
-    app.waitForState(job, JobState.FAILED);
+    app.waitForState(job, JobState.SUCCEEDED);
   }
   }
   
   
   @Test
   @Test

+ 136 - 0
mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java

@@ -0,0 +1,136 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job.impl;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.Test;
+import org.junit.Assert;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.any;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+
+
+/**
+ * Tests various functions of the JobImpl class
+ */
+public class TestJobImpl {
+  
+  @Test
+  public void testJobNoTasksTransition() { 
+    JobNoTasksCompletedTransition trans = new JobNoTasksCompletedTransition();
+    JobImpl mockJob = mock(JobImpl.class);
+
+    // Force checkJobCompleteSuccess to return null
+    Task mockTask = mock(Task.class);
+    Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
+    tasks.put(mockTask.getID(), mockTask);
+    when(mockJob.getTasks()).thenReturn(tasks);
+
+    when(mockJob.getState()).thenReturn(JobState.ERROR);
+    JobEvent mockJobEvent = mock(JobEvent.class);
+    JobState state = trans.transition(mockJob, mockJobEvent);
+    Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition",
+        JobState.ERROR, state);
+  }
+  
+  @Test
+  public void testCheckJobCompleteSuccess() {
+    
+    JobImpl mockJob = mock(JobImpl.class);
+    OutputCommitter mockCommitter = mock(OutputCommitter.class);
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    JobContext mockJobContext = mock(JobContext.class);
+    
+    when(mockJob.getCommitter()).thenReturn(mockCommitter);
+    when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
+    when(mockJob.getJobContext()).thenReturn(mockJobContext);
+    doNothing().when(mockJob).setFinishTime();
+    doNothing().when(mockJob).logJobHistoryFinishedEvent();
+    when(mockJob.finished(any(JobState.class))).thenReturn(JobState.SUCCEEDED);
+
+    try {
+      doNothing().when(mockCommitter).commitJob(any(JobContext.class));
+    } catch (IOException e) {
+      // commitJob stubbed out, so this can't happen
+    }
+    doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
+    Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
+      "for successful job",
+      JobImpl.checkJobCompleteSuccess(mockJob));
+    Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
+        JobImpl.checkJobCompleteSuccess(mockJob), JobState.SUCCEEDED);    
+  }
+
+  @Test
+  public void testCheckJobCompleteSuccessFailed() {
+    JobImpl mockJob = mock(JobImpl.class);
+
+    // Make the completedTasks not equal the getTasks()
+    Task mockTask = mock(Task.class);
+    Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
+    tasks.put(mockTask.getID(), mockTask);
+    when(mockJob.getTasks()).thenReturn(tasks);
+    
+    try {
+      // Just in case the code breaks and reaches these calls
+      OutputCommitter mockCommitter = mock(OutputCommitter.class);
+      EventHandler mockEventHandler = mock(EventHandler.class);
+      doNothing().when(mockCommitter).commitJob(any(JobContext.class));
+      doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
+    } catch (IOException e) {
+      e.printStackTrace();    
+    }
+    Assert.assertNull("checkJobCompleteSuccess incorrectly returns not-null " +
+      "for unsuccessful job",
+      JobImpl.checkJobCompleteSuccess(mockJob));
+  }
+
+
+  public static void main(String[] args) throws Exception {
+    TestJobImpl t = new TestJobImpl();
+    t.testJobNoTasksTransition();
+    t.testCheckJobCompleteSuccess();
+    t.testCheckJobCompleteSuccessFailed();
+  }
+}