Browse Source

MAPREDUCE-7042. Killed MR job data does not move to mapreduce.jobhistory.done-dir when ATS v2 is enabled. Contributed by Rohith Sharma K S.

Sunil G 7 years ago
parent
commit
83e60cd2db

+ 58 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java

@@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -119,7 +120,11 @@ public class JobHistoryEventHandler extends AbstractService
 
   protected BlockingQueue<JobHistoryEvent> eventQueue =
     new LinkedBlockingQueue<JobHistoryEvent>();
+
+  protected boolean handleTimelineEvent = false;
+  protected AsyncDispatcher atsEventDispatcher = null;
   protected Thread eventHandlingThread;
+
   private volatile boolean stopped;
   private final Object lock = new Object();
 
@@ -279,6 +284,7 @@ public class JobHistoryEventHandler extends AbstractService
               ((MRAppMaster.RunningAppContext) context).getTimelineClient();
           timelineClient.init(conf);
         }
+        handleTimelineEvent = true;
         LOG.info("Timeline service is enabled; version: " +
             YarnConfiguration.getTimelineServiceVersion(conf));
       } else {
@@ -302,10 +308,23 @@ public class JobHistoryEventHandler extends AbstractService
           "'json' or 'binary'.  Falling back to default value '" +
           JHAdminConfig.DEFAULT_MR_HS_JHIST_FORMAT + "'.");
     }
-
+    // initiate the atsEventDispatcher for timeline event
+    // if timeline service is enabled.
+    if (handleTimelineEvent) {
+      atsEventDispatcher = createDispatcher();
+      EventHandler<JobHistoryEvent> timelineEventHandler =
+          new ForwardingEventHandler();
+      atsEventDispatcher.register(EventType.class, timelineEventHandler);
+      atsEventDispatcher.setDrainEventsOnStop();
+      atsEventDispatcher.init(conf);
+    }
     super.serviceInit(conf);
   }
 
+  protected AsyncDispatcher createDispatcher() {
+    return new AsyncDispatcher("Job ATS Event Dispatcher");
+  }
+
   private void mkdir(FileSystem fs, Path path, FsPermission fsp)
       throws IOException {
     if (!fs.exists(path)) {
@@ -371,6 +390,10 @@ public class JobHistoryEventHandler extends AbstractService
         }
     }, "eventHandlingThread");
     eventHandlingThread.start();
+
+    if (handleTimelineEvent) {
+      atsEventDispatcher.start();
+    }
     super.serviceStart();
   }
 
@@ -461,6 +484,11 @@ public class JobHistoryEventHandler extends AbstractService
         LOG.info("Exception while closing file " + e.getMessage());
       }
     }
+
+    if (handleTimelineEvent && atsEventDispatcher != null) {
+      atsEventDispatcher.stop();
+    }
+
     if (timelineClient != null) {
       timelineClient.stop();
     } else if (timelineV2Client != null) {
@@ -580,6 +608,10 @@ public class JobHistoryEventHandler extends AbstractService
       }
 
       eventQueue.put(event);
+      // Process it for ATS (if enabled)
+      if (handleTimelineEvent) {
+        atsEventDispatcher.getEventHandler().handle(event);
+      }
     } catch (InterruptedException e) {
       throw new YarnRuntimeException(e);
     }
@@ -622,13 +654,6 @@ public class JobHistoryEventHandler extends AbstractService
         }
         processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
             event.getJobID());
-        if (timelineV2Client != null) {
-          processEventForNewTimelineService(historyEvent, event.getJobID(),
-              event.getTimestamp());
-        } else if (timelineClient != null) {
-          processEventForTimelineServer(historyEvent, event.getJobID(),
-              event.getTimestamp());
-        }
         if (LOG.isDebugEnabled()) {
           LOG.debug("In HistoryEventHandler "
               + event.getHistoryEvent().getEventType());
@@ -710,6 +735,23 @@ public class JobHistoryEventHandler extends AbstractService
     }
   }
 
+  private void handleTimelineEvent(JobHistoryEvent event) {
+    HistoryEvent historyEvent = event.getHistoryEvent();
+    if (handleTimelineEvent) {
+      if (timelineV2Client != null) {
+        processEventForNewTimelineService(historyEvent, event.getJobID(),
+            event.getTimestamp());
+      } else if (timelineClient != null) {
+        processEventForTimelineServer(historyEvent, event.getJobID(),
+            event.getTimestamp());
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("In HistoryEventHandler, handle timelineEvent:"
+          + event.getHistoryEvent().getEventType());
+    }
+  }
+
   public void processEventForJobSummary(HistoryEvent event, JobSummary summary, 
       JobId jobId) {
     // context.getJob could be used for some of this info as well.
@@ -1745,4 +1787,12 @@ public class JobHistoryEventHandler extends AbstractService
   boolean getFlushTimerStatus() {
     return isTimerActive;
   }
+
+  private final class ForwardingEventHandler
+      implements EventHandler<JobHistoryEvent> {
+    @Override
+    public void handle(JobHistoryEvent event) {
+      handleTimelineEvent(event);
+    }
+  }
 }

+ 29 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java

@@ -73,6 +73,8 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.timeline.TimelineStore;
@@ -589,6 +591,7 @@ public class TestJobHistoryEventHandler {
       handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
               t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1),
               currentTime - 10));
+      jheh.getDispatcher().await();
       TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
               null, null, null, null, null, null, null);
       Assert.assertEquals(1, entities.getEntities().size());
@@ -605,6 +608,7 @@ public class TestJobHistoryEventHandler {
               "user", 200, "/foo/job.xml",
               new HashMap<JobACL, AccessControlList>(), "default"),
               currentTime + 10));
+      jheh.getDispatcher().await();
       entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
               null, null, null, null, null, null);
       Assert.assertEquals(1, entities.getEntities().size());
@@ -623,6 +627,7 @@ public class TestJobHistoryEventHandler {
       handleEvent(jheh, new JobHistoryEvent(t.jobId,
               new JobQueueChangeEvent(TypeConverter.fromYarn(t.jobId), "q2"),
               currentTime - 20));
+      jheh.getDispatcher().await();
       entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
               null, null, null, null, null, null);
       Assert.assertEquals(1, entities.getEntities().size());
@@ -645,6 +650,7 @@ public class TestJobHistoryEventHandler {
       handleEvent(jheh, new JobHistoryEvent(t.jobId,
               new JobFinishedEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0,
               0, 0, 0, new Counters(), new Counters(), new Counters()), currentTime));
+      jheh.getDispatcher().await();
       entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
               null, null, null, null, null, null);
       Assert.assertEquals(1, entities.getEntities().size());
@@ -672,6 +678,7 @@ public class TestJobHistoryEventHandler {
             new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId),
             0, 0, 0, 0, 0, 0, 0, JobStateInternal.KILLED.toString()),
             currentTime + 20));
+      jheh.getDispatcher().await();
       entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
               null, null, null, null, null, null);
       Assert.assertEquals(1, entities.getEntities().size());
@@ -701,6 +708,7 @@ public class TestJobHistoryEventHandler {
 
       handleEvent(jheh, new JobHistoryEvent(t.jobId,
             new TaskStartedEvent(t.taskID, 0, TaskType.MAP, "")));
+      jheh.getDispatcher().await();
       entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
               null, null, null, null, null, null);
       Assert.assertEquals(1, entities.getEntities().size());
@@ -714,6 +722,7 @@ public class TestJobHistoryEventHandler {
 
       handleEvent(jheh, new JobHistoryEvent(t.jobId,
             new TaskStartedEvent(t.taskID, 0, TaskType.REDUCE, "")));
+      jheh.getDispatcher().await();
       entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
               null, null, null, null, null, null);
       Assert.assertEquals(1, entities.getEntities().size());
@@ -1031,6 +1040,7 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
 
   private EventWriter eventWriter;
   private boolean mockHistoryProcessing = true;
+  private DrainDispatcher dispatcher;
   public JHEvenHandlerForTest(AppContext context, int startCount) {
     super(context, startCount);
     JobHistoryEventHandler.fileMap.clear();
@@ -1042,6 +1052,12 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
     JobHistoryEventHandler.fileMap.clear();
   }
 
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+
+  }
+
   @Override
   protected void serviceStart() {
     if (timelineClient != null) {
@@ -1049,6 +1065,19 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
     } else if (timelineV2Client != null) {
       timelineV2Client.start();
     }
+    if (handleTimelineEvent) {
+      atsEventDispatcher.start();
+    }
+  }
+
+  @Override
+  protected AsyncDispatcher createDispatcher() {
+    dispatcher = new DrainDispatcher();
+    return dispatcher;
+  }
+
+  public DrainDispatcher getDispatcher() {
+    return dispatcher;
   }
 
   @Override