Browse Source

svn merge -c 1383709 FIXES: MAPREDUCE-4646. Fixed MR framework to send diagnostic information correctly to clients in case of failed jobs also. Contributed by Jason Lowe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1390700 13f79535-47bb-0310-9956-ffa450edef68
Robert Joseph Evans 12 years ago
parent
commit
3cf78561e7

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -32,6 +32,9 @@ Release 0.23.4 - UNRELEASED
     MAPREDUCE-4610. Support deprecated mapreduce.job.counters.limit property in
     MAPREDUCE-4610. Support deprecated mapreduce.job.counters.limit property in
     MR2. (tomwhite)
     MR2. (tomwhite)
 
 
+    MAPREDUCE-4646. Fixed MR framework to send diagnostic information correctly
+    to clients in case of failed jobs also. (Jason Lowe via vinodkv)
+
 Release 0.23.3 - UNRELEASED
 Release 0.23.3 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 10 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -554,17 +554,23 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
       String jobFile =
       String jobFile =
           remoteJobConfFile == null ? "" : remoteJobConfFile.toString();
           remoteJobConfFile == null ? "" : remoteJobConfFile.toString();
 
 
+      StringBuilder diagsb = new StringBuilder();
+      for (String s : getDiagnostics()) {
+        diagsb.append(s).append("\n");
+      }
+
       if (getState() == JobState.NEW) {
       if (getState() == JobState.NEW) {
         return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
         return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
             appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
             appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
-            cleanupProgress, jobFile, amInfos, isUber);
+            cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
       }
       }
 
 
       computeProgress();
       computeProgress();
-      return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
-          appSubmitTime, startTime, finishTime, setupProgress,
+      JobReport report = MRBuilderUtils.newJobReport(jobId, jobName, username,
+          state, appSubmitTime, startTime, finishTime, setupProgress,
           this.mapProgress, this.reduceProgress,
           this.mapProgress, this.reduceProgress,
-          cleanupProgress, jobFile, amInfos, isUber);
+          cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
+      return report;
     } finally {
     } finally {
       readLock.unlock();
       readLock.unlock();
     }
     }

+ 7 - 7
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

@@ -136,7 +136,7 @@ public class TestRMContainerAllocator {
     Job mockJob = mock(Job.class);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
     when(mockJob.getReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 
-            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
         appAttemptId, mockJob);
 
 
@@ -213,7 +213,7 @@ public class TestRMContainerAllocator {
     Job mockJob = mock(Job.class);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
     when(mockJob.getReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
-            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
         appAttemptId, mockJob);
 
 
@@ -279,7 +279,7 @@ public class TestRMContainerAllocator {
     Job mockJob = mock(Job.class);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
     when(mockJob.getReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
-            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
         appAttemptId, mockJob);
 
 
@@ -639,7 +639,7 @@ public class TestRMContainerAllocator {
     Job mockJob = mock(Job.class);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
     when(mockJob.getReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
-            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
         appAttemptId, mockJob);
 
 
@@ -743,7 +743,7 @@ public class TestRMContainerAllocator {
     Job mockJob = mock(Job.class);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
     when(mockJob.getReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
-            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     MyContainerAllocator allocator =
     MyContainerAllocator allocator =
         new MyContainerAllocator(rm, conf, appAttemptId, mockJob);
         new MyContainerAllocator(rm, conf, appAttemptId, mockJob);
 
 
@@ -909,7 +909,7 @@ public class TestRMContainerAllocator {
     Job mockJob = mock(Job.class);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
     when(mockJob.getReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
-            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
         appAttemptId, mockJob);
 
 
@@ -1346,7 +1346,7 @@ public class TestRMContainerAllocator {
     Job job = mock(Job.class);
     Job job = mock(Job.class);
     when(job.getReport()).thenReturn(
     when(job.getReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
-            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     doReturn(10).when(job).getTotalMaps();
     doReturn(10).when(job).getTotalMaps();
     doReturn(10).when(job).getTotalReduces();
     doReturn(10).when(job).getTotalReduces();
     doReturn(0).when(job).getCompletedMaps();
     doReturn(0).when(job).getCompletedMaps();

+ 40 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java

@@ -45,11 +45,14 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.Records;
@@ -172,6 +175,8 @@ public class TestJobImpl {
     t.testCheckJobCompleteSuccess();
     t.testCheckJobCompleteSuccess();
     t.testCheckJobCompleteSuccessFailed();
     t.testCheckJobCompleteSuccessFailed();
     t.testCheckAccess();
     t.testCheckAccess();
+    t.testReportDiagnostics();
+    t.testUberDecision();
   }
   }
 
 
   @Test
   @Test
@@ -241,6 +246,41 @@ public class TestJobImpl {
     Assert.assertTrue(job5.checkAccess(ugi1, null));
     Assert.assertTrue(job5.checkAccess(ugi1, null));
     Assert.assertTrue(job5.checkAccess(ugi2, null));
     Assert.assertTrue(job5.checkAccess(ugi2, null));
   }
   }
+
+  @Test
+  public void testReportDiagnostics() throws Exception {
+    JobID jobID = JobID.forName("job_1234567890000_0001");
+    JobId jobId = TypeConverter.toYarn(jobID);
+    final String diagMsg = "some diagnostic message";
+    final JobDiagnosticsUpdateEvent diagUpdateEvent =
+        new JobDiagnosticsUpdateEvent(jobId, diagMsg);
+    MRAppMetrics mrAppMetrics = MRAppMetrics.create();
+    JobImpl job = new JobImpl(jobId, Records
+        .newRecord(ApplicationAttemptId.class), new Configuration(),
+        mock(EventHandler.class),
+        null, mock(JobTokenSecretManager.class), null,
+        new SystemClock(), null,
+        mrAppMetrics, mock(OutputCommitter.class),
+        true, null, 0, null, null);
+    job.handle(diagUpdateEvent);
+    String diagnostics = job.getReport().getDiagnostics();
+    Assert.assertNotNull(diagnostics);
+    Assert.assertTrue(diagnostics.contains(diagMsg));
+
+    job = new JobImpl(jobId, Records
+        .newRecord(ApplicationAttemptId.class), new Configuration(),
+        mock(EventHandler.class),
+        null, mock(JobTokenSecretManager.class), null,
+        new SystemClock(), null,
+        mrAppMetrics, mock(OutputCommitter.class),
+        true, null, 0, null, null);
+    job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+    job.handle(diagUpdateEvent);
+    diagnostics = job.getReport().getDiagnostics();
+    Assert.assertNotNull(diagnostics);
+    Assert.assertTrue(diagnostics.contains(diagMsg));
+  }
+
   @Test
   @Test
   public void testUberDecision() throws Exception {
   public void testUberDecision() throws Exception {
 
 

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java

@@ -67,7 +67,7 @@ public class MRBuilderUtils {
       String userName, JobState state, long submitTime, long startTime, long finishTime,
       String userName, JobState state, long submitTime, long startTime, long finishTime,
       float setupProgress, float mapProgress, float reduceProgress,
       float setupProgress, float mapProgress, float reduceProgress,
       float cleanupProgress, String jobFile, List<AMInfo> amInfos,
       float cleanupProgress, String jobFile, List<AMInfo> amInfos,
-      boolean isUber) {
+      boolean isUber, String diagnostics) {
     JobReport report = Records.newRecord(JobReport.class);
     JobReport report = Records.newRecord(JobReport.class);
     report.setJobId(jobId);
     report.setJobId(jobId);
     report.setJobName(jobName);
     report.setJobName(jobName);
@@ -83,6 +83,7 @@ public class MRBuilderUtils {
     report.setJobFile(jobFile);
     report.setJobFile(jobFile);
     report.setAMInfos(amInfos);
     report.setAMInfos(amInfos);
     report.setIsUber(isUber);
     report.setIsUber(isUber);
+    report.setDiagnostics(diagnostics);
     return report;
     return report;
   }
   }
 
 

+ 4 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java

@@ -218,7 +218,8 @@ public class TestClientServiceDelegate {
     GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class);
     GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class);
     when(jobReportResponse1.getJobReport()).thenReturn(
     when(jobReportResponse1.getJobReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "jobName-firstGen", "user",
         MRBuilderUtils.newJobReport(jobId, "jobName-firstGen", "user",
-            JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, false));
+            JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null,
+            false, ""));
 
 
     // First AM returns a report with jobName firstGen and simulates AM shutdown
     // First AM returns a report with jobName firstGen and simulates AM shutdown
     // on second invocation.
     // on second invocation.
@@ -230,7 +231,8 @@ public class TestClientServiceDelegate {
     GetJobReportResponse jobReportResponse2 = mock(GetJobReportResponse.class);
     GetJobReportResponse jobReportResponse2 = mock(GetJobReportResponse.class);
     when(jobReportResponse2.getJobReport()).thenReturn(
     when(jobReportResponse2.getJobReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "jobName-secondGen", "user",
         MRBuilderUtils.newJobReport(jobId, "jobName-secondGen", "user",
-            JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, false));
+            JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null,
+            false, ""));
 
 
     // Second AM generation returns a report with jobName secondGen
     // Second AM generation returns a report with jobName secondGen
     MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);
     MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);