Ver Fonte

MAPREDUCE-6277. Job can post multiple history files if attempt loses connection to the RM. Contributed by Chang Li
(cherry picked from commit 30da99cbaf36aeef38a858251ce8ffa5eb657b38)

Jason Lowe há 10 anos atrás
pai
commit
24d8c6f355

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -210,6 +210,9 @@ Release 2.7.0 - UNRELEASED
 
     MAPREDUCE-4742. Fix typo in nnbench#displayUsage. (Liang Xie via ozawa)
 
+    MAPREDUCE-6277. Job can post multiple history files if attempt loses
+    connection to the RM (Chang Li via jlowe)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -698,7 +698,7 @@ public class RMContainerAllocator extends RMContainerRequestor
       if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
         LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
         eventHandler.handle(new JobEvent(this.getJob().getID(),
-                                         JobEventType.INTERNAL_ERROR));
+                                         JobEventType.JOB_AM_REBOOT));
         throw new YarnRuntimeException("Could not contact RM after " +
                                 retryInterval + " milliseconds.");
       }

+ 61 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java

@@ -65,6 +65,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
@@ -1608,6 +1610,7 @@ public class TestRMContainerAllocator {
       = new ArrayList<TaskAttemptKillEvent>();
     static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents 
     = new ArrayList<JobUpdatedNodesEvent>();
+    static final List<JobEvent> jobEvents = new ArrayList<JobEvent>();
     private MyResourceManager rm;
     private boolean isUnregistered = false;
     private AllocateResponse allocateResponse;
@@ -1630,6 +1633,8 @@ public class TestRMContainerAllocator {
             taskAttemptKillEvents.add((TaskAttemptKillEvent)event);
           } else if (event instanceof JobUpdatedNodesEvent) {
             jobUpdatedNodeEvents.add((JobUpdatedNodesEvent)event);
+          } else if (event instanceof JobEvent) {
+            jobEvents.add((JobEvent)event);
           }
         }
       });
@@ -1782,6 +1787,18 @@ public class TestRMContainerAllocator {
     }
   }
 
+  private static class MyContainerAllocator2 extends MyContainerAllocator {
+    public MyContainerAllocator2(MyResourceManager rm, Configuration conf,
+      ApplicationAttemptId appAttemptId, Job job) {
+      super(rm, conf, appAttemptId, job);
+    }
+    @Override
+    protected AllocateResponse makeRemoteRequest() throws IOException,
+      YarnException {
+      throw new YarnRuntimeException("for testing");
+    }
+  }
+
   @Test
   public void testReduceScheduling() throws Exception {
     int totalMaps = 10;
@@ -2307,6 +2324,50 @@ public class TestRMContainerAllocator {
 
   }
 
+  @Test
+  public void testRMUnavailable()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(
+      MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 0);
+    MyResourceManager rm1 = new MyResourceManager(conf);
+    rm1.start();
+    DrainDispatcher dispatcher =
+        (DrainDispatcher) rm1.getRMContext().getDispatcher();
+    RMApp app = rm1.submitApp(1024);
+    dispatcher.await();
+
+    MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    nm1.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId =
+        app.getCurrentAppAttempt().getAppAttemptId();
+    rm1.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+        0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    MyContainerAllocator2 allocator =
+        new MyContainerAllocator2(rm1, conf, appAttemptId, mockJob);
+    allocator.jobEvents.clear();
+    try {
+      allocator.schedule();
+      Assert.fail("Should Have Exception");
+    } catch (YarnRuntimeException e) {
+      Assert.assertTrue(e.getMessage().contains("Could not contact RM after"));
+    }
+    dispatcher.await();
+    Assert.assertEquals("Should Have 1 Job Event", 1,
+        allocator.jobEvents.size());
+    JobEvent event = allocator.jobEvents.get(0); 
+    Assert.assertTrue("Should Reboot", event.getType().equals(JobEventType.JOB_AM_REBOOT));
+  }
+
   @Test(timeout=60000)
   public void testAMRMTokenUpdate() throws Exception {
     LOG.info("Running testAMRMTokenUpdate");