Browse Source

svn merge -c 1588559 FIXES: MAPREDUCE-4937. MR AM handles an oversized split metainfo file poorly. Contributed by Eric Payne

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1588561 13f79535-47bb-0310-9956-ffa450edef68
Jason Darrell Lowe 11 years ago
parent
commit
c8d5d1f82a

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -41,6 +41,9 @@ Release 2.5.0 - UNRELEASED
     MAPREDUCE-5775. Remove unnecessary job.setNumReduceTasks in SleepJob.createJob 
     (jhanver chand sharma via devaraj)
 
+    MAPREDUCE-4937. MR AM handles an oversized split metainfo file poorly
+    (Eric Payne via jlowe)
+
 Release 2.4.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 13 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -1040,6 +1040,7 @@ public class MRAppMaster extends CompositeService {
     // It's more test friendly to put it here.
     DefaultMetricsSystem.initialize("MRAppMaster");
 
+    boolean initFailed = false;
     if (!errorHappenedShutDown) {
       // create a job event for job intialization
       JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
@@ -1048,6 +1049,10 @@ public class MRAppMaster extends CompositeService {
       // job-init to be done completely here.
       jobEventDispatcher.handle(initJobEvent);
 
+      // If job is still not initialized, an error happened during
+      // initialization. Must complete starting all of the services so failure
+      // events can be processed.
+      initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED);
 
       // JobImpl's InitTransition is done (call above is synchronous), so the
       // "uber-decision" (MR-1220) has been made.  Query job and switch to
@@ -1076,8 +1081,14 @@ public class MRAppMaster extends CompositeService {
 
     // set job classloader if configured
     MRApps.setJobClassLoader(getConfig());
-    // All components have started, start the job.
-    startJobs();
+
+    if (initFailed) {
+      JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
+      jobEventDispatcher.handle(initFailedEvent);
+    } else {
+      // All components have started, start the job.
+      startJobs();
+    }
   }
   
   @Override

+ 1 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java

@@ -28,6 +28,7 @@ public enum JobEventType {
 
   //Producer:MRAppMaster
   JOB_INIT,
+  JOB_INIT_FAILED,
   JOB_START,
 
   //Producer:Task

+ 28 - 15
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -250,9 +250,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition
               (JobStateInternal.NEW,
-              EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
+              EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
               JobEventType.JOB_INIT,
               new InitTransition())
+          .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT,
+              JobEventType.JOB_INIT_FAILED,
+              new InitFailedTransition())
           .addTransition(JobStateInternal.NEW, JobStateInternal.KILLED,
               JobEventType.JOB_KILL,
               new KillNewJobTransition())
@@ -265,7 +268,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           // Ignore-able events
           .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
               JobEventType.JOB_UPDATED_NODES)
-              
+
           // Transitions from INITED state
           .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
@@ -1374,6 +1377,15 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     public JobStateInternal transition(JobImpl job, JobEvent event) {
       job.metrics.submittedJob(job);
       job.metrics.preparingJob(job);
+
+      if (job.newApiCommitter) {
+        job.jobContext = new JobContextImpl(job.conf,
+            job.oldJobId);
+      } else {
+        job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
+            job.conf, job.oldJobId);
+      }
+      
       try {
         setup(job);
         job.fs = job.getFileSystem(job.conf);
@@ -1409,14 +1421,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
 
         checkTaskLimits();
 
-        if (job.newApiCommitter) {
-          job.jobContext = new JobContextImpl(job.conf,
-              job.oldJobId);
-        } else {
-          job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
-              job.conf, job.oldJobId);
-        }
-        
         long inputLength = 0;
         for (int i = 0; i < job.numMapTasks; ++i) {
           inputLength += taskSplitMetaInfo[i].getInputDataLength();
@@ -1443,15 +1447,14 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
 
         job.metrics.endPreparingJob(job);
         return JobStateInternal.INITED;
-      } catch (IOException e) {
+      } catch (Exception e) {
         LOG.warn("Job init failed", e);
         job.metrics.endPreparingJob(job);
         job.addDiagnostic("Job init failed : "
             + StringUtils.stringifyException(e));
-        job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
-            job.jobContext,
-            org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
-        return JobStateInternal.FAILED;
+        // Leave job in the NEW state. The MR AM will detect that the state is
+        // not INITED and send a JOB_INIT_FAILED event.
+        return JobStateInternal.NEW;
       }
     }
 
@@ -1552,6 +1555,16 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     }
   } // end of InitTransition
 
+  private static class InitFailedTransition
+      implements SingleArcTransition<JobImpl, JobEvent> {
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+        job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
+                job.jobContext,
+                org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
+    }
+  }
+
   private static class SetupCompletedTransition
       implements SingleArcTransition<JobImpl, JobEvent> {
     @Override

+ 30 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java

@@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -728,6 +729,35 @@ public class TestJobImpl {
     commitHandler.stop();
   }
 
+  static final String EXCEPTIONMSG = "Splits max exceeded";
+  @Test
+  public void testMetaInfoSizeOverMax() throws Exception {
+    Configuration conf = new Configuration();
+    JobID jobID = JobID.forName("job_1234567890000_0001");
+    JobId jobId = TypeConverter.toYarn(jobID);
+    MRAppMetrics mrAppMetrics = MRAppMetrics.create();
+    JobImpl job =
+        new JobImpl(jobId, ApplicationAttemptId.newInstance(
+          ApplicationId.newInstance(0, 0), 0), conf, mock(EventHandler.class),
+          null, new JobTokenSecretManager(), new Credentials(), null, null,
+          mrAppMetrics, null, true, null, 0, null, null, null, null);
+    InitTransition initTransition = new InitTransition() {
+        @Override
+        protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
+          throw new YarnRuntimeException(EXCEPTIONMSG);
+        }
+      };
+    JobEvent mockJobEvent = mock(JobEvent.class);
+
+    JobStateInternal jobSI = initTransition.transition(job, mockJobEvent);
+    Assert.assertTrue("When init fails, return value from InitTransition.transition should equal NEW.",
+                      jobSI.equals(JobStateInternal.NEW));
+    Assert.assertTrue("Job diagnostics should contain YarnRuntimeException",
+                      job.getDiagnostics().toString().contains("YarnRuntimeException"));
+    Assert.assertTrue("Job diagnostics should contain " + EXCEPTIONMSG,
+                      job.getDiagnostics().toString().contains(EXCEPTIONMSG));
+  }
+
   private static CommitterEventHandler createCommitterEventHandler(
       Dispatcher dispatcher, OutputCommitter committer) {
     final SystemClock clock = new SystemClock();