浏览代码

MAPREDUCE-2615. Make killJob go through AM and fix JobSummaryLog. (Siddharth Seth via llu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-279@1138689 13f79535-47bb-0310-9956-ffa450edef68
Luke Lu 14 年之前
父节点
当前提交
040a35c461

+ 3 - 0
mapreduce/CHANGES.txt

@@ -5,6 +5,9 @@ Trunk (unreleased changes)
 
 
     MAPREDUCE-279
     MAPREDUCE-279
 
 
+    MAPREDUCE-2615. Make killJob go through AM and fix JobSummaryLog.
+    (Siddharth Seth via llu)
+
     Fix for NPE in YarnChild that was causing lots of tasks to fail. (vinodkv)
     Fix for NPE in YarnChild that was causing lots of tasks to fail. (vinodkv)
 
 
     Fix for ConcurrentModification exception while iterating through tokens in
     Fix for ConcurrentModification exception while iterating through tokens in

+ 5 - 4
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java

@@ -385,7 +385,7 @@ public class JobHistoryEventHandler extends AbstractService
       try {
       try {
         HistoryEvent historyEvent = event.getHistoryEvent();
         HistoryEvent historyEvent = event.getHistoryEvent();
         mi.writeEvent(historyEvent);
         mi.writeEvent(historyEvent);
-        processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary());
+        processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID());
         LOG.info("In HistoryEventHandler "
         LOG.info("In HistoryEventHandler "
             + event.getHistoryEvent().getEventType());
             + event.getHistoryEvent().getEventType());
       } catch (IOException e) {
       } catch (IOException e) {
@@ -427,7 +427,7 @@ public class JobHistoryEventHandler extends AbstractService
     }
     }
   }
   }
 
 
-  private void processEventForJobSummary(HistoryEvent event, JobSummary summary) {
+  private void processEventForJobSummary(HistoryEvent event, JobSummary summary, JobId jobId) {
     // context.getJob could be used for some of this info as well.
     // context.getJob could be used for some of this info as well.
     switch (event.getEventType()) {
     switch (event.getEventType()) {
     case JOB_SUBMITTED:
     case JOB_SUBMITTED:
@@ -467,9 +467,10 @@ public class JobHistoryEventHandler extends AbstractService
     case JOB_KILLED:
     case JOB_KILLED:
       JobUnsuccessfulCompletionEvent juce = (JobUnsuccessfulCompletionEvent) event;
       JobUnsuccessfulCompletionEvent juce = (JobUnsuccessfulCompletionEvent) event;
       summary.setJobStatus(juce.getStatus());
       summary.setJobStatus(juce.getStatus());
+      summary.setNumFinishedMaps(context.getJob(jobId).getTotalMaps());
+      summary.setNumFinishedReduces(context.getJob(jobId).getTotalReduces());
+      summary.setJobFinishTime(juce.getFinishTime());
       break;
       break;
-    // TODO Verify: MRV2 + MRV1. A JOB_FINISHED event will always come in after
-    // this. Stats on taskCounts can be set via that.
     }
     }
   }
   }
 
 

+ 35 - 0
mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -468,4 +469,38 @@ public class ClientServiceDelegate {
     }
     }
     return true;
     return true;
   }
   }
+  
+  public boolean killJob(JobID oldJobID)
+  throws YarnRemoteException {
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobId 
+    = TypeConverter.toYarn(oldJobID);
+    KillJobRequest killRequest = recordFactory.newRecordInstance(KillJobRequest.class);
+    MRClientProtocol protocol = getProxy(oldJobID);
+    if (protocol == null) {
+      return false;
+    }
+    try {
+      killRequest.setJobId(jobId);
+      protocol.killJob(killRequest);
+      return true;
+    } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
+      LOG.warn(RPCUtil.toString(yre));
+      throw yre;
+    } catch(Exception e) {
+      // Not really requied - if this is always the history context.
+      LOG.debug("Failed to contact application master ", e);
+      MRClientProtocol proxy = getRefreshedProxy(oldJobID);
+      if (proxy == null) {
+        return false;
+      }
+      try {
+        killRequest.setJobId(jobId);
+        protocol.killJob(killRequest);
+        return true;
+      } catch(YarnRemoteException yre) {
+        LOG.warn(RPCUtil.toString(yre));
+        throw yre;
+      }
+    }
+  }
 }
 }

+ 2 - 0
mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java

@@ -537,8 +537,10 @@ public class YARNRunner implements ClientProtocol {
 
 
   @Override
   @Override
   public void killJob(JobID arg0) throws IOException, InterruptedException {
   public void killJob(JobID arg0) throws IOException, InterruptedException {
+    if (!clientServiceDelegate.killJob(arg0)) {
     resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
     resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
   }
   }
+  }
 
 
   @Override
   @Override
   public boolean killTask(TaskAttemptID arg0, boolean arg1) throws IOException,
   public boolean killTask(TaskAttemptID arg0, boolean arg1) throws IOException,