Browse Source

MAPREDUCE-6018. Added an MR specific config to enable emitting job history data to the timeline server. Contributed by Robert Kanter.

(cherry picked from commit 971e91c8c03a23e4613ed3f071b4f982ee5a1b63)
Zhijie Shen 10 years ago
parent
commit
d92d4e32f2

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -49,6 +49,9 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-5970. Provide a boolean switch to enable MR-AM profiling (Gera
     Shegalov via jlowe)
 
+    MAPREDUCE-6018. Added an MR specific config to enable emitting job history
+    data to the timeline server. (Robert Kanter via zjshen)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 18 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java

@@ -241,8 +241,14 @@ public class JobHistoryEventHandler extends AbstractService
             MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
             MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
 
-    timelineClient = TimelineClient.createTimelineClient();
-    timelineClient.init(conf);
+    if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
+        MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
+      timelineClient = TimelineClient.createTimelineClient();
+      timelineClient.init(conf);
+      LOG.info("Emitting job history data to the timeline server is enabled");
+    } else {
+      LOG.info("Emitting job history data to the timeline server is not enabled");
+    }
 
     super.serviceInit(conf);
   }
@@ -268,7 +274,9 @@ public class JobHistoryEventHandler extends AbstractService
 
   @Override
   protected void serviceStart() throws Exception {
-    timelineClient.start();
+    if (timelineClient != null) {
+      timelineClient.start();
+    }
     eventHandlingThread = new Thread(new Runnable() {
       @Override
       public void run() {
@@ -537,7 +545,7 @@ public class JobHistoryEventHandler extends AbstractService
       // For all events
       // (1) Write it out
       // (2) Process it for JobSummary
-      // (3) Process it for ATS
+      // (3) Process it for ATS (if enabled)
       MetaInfo mi = fileMap.get(event.getJobID());
       try {
         HistoryEvent historyEvent = event.getHistoryEvent();
@@ -546,8 +554,10 @@ public class JobHistoryEventHandler extends AbstractService
         }
         processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
             event.getJobID());
-        processEventForTimelineServer(historyEvent, event.getJobID(),
-                event.getTimestamp());
+        if (timelineClient != null) {
+          processEventForTimelineServer(historyEvent, event.getJobID(),
+              event.getTimestamp());
+        }
         if (LOG.isDebugEnabled()) {
           LOG.debug("In HistoryEventHandler "
               + event.getHistoryEvent().getEventType());
@@ -839,8 +849,8 @@ public class JobHistoryEventHandler extends AbstractService
         tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime());
         tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
         tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
-                tfe2.getSuccessfulTaskAttemptId() == null ?
-                "" : tfe2.getSuccessfulTaskAttemptId().toString());
+            tfe2.getSuccessfulTaskAttemptId() == null ?
+            "" : tfe2.getSuccessfulTaskAttemptId().toString());
         tEntity.addEvent(tEvent);
         tEntity.setEntityId(tfe2.getTaskId().toString());
         tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -378,6 +378,11 @@ public interface MRJobConfig {
   public static final String JOB_UBERTASK_MAXBYTES =
     "mapreduce.job.ubertask.maxbytes";
 
+  public static final String MAPREDUCE_JOB_EMIT_TIMELINE_DATA =
+    "mapreduce.job.emit-timeline-data";
+  public static final boolean DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA =
+      false;
+
   public static final String MR_PREFIX = "yarn.app.mapreduce.";
 
   public static final String MR_AM_PREFIX = MR_PREFIX + "am.";

+ 8 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -833,6 +833,14 @@
   </description>
 </property>
 
+<property>
+    <name>mapreduce.job.emit-timeline-data</name>
+    <value>false</value>
+    <description>Specifies if the Application Master should emit timeline data
+    to the timeline server. Individual jobs can override this value.
+    </description>
+</property>
+
 <property>
   <name>mapreduce.input.fileinputformat.split.minsize</name>
   <value>0</value>

+ 80 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.jobhistory.EventType;
 import org.apache.hadoop.mapreduce.jobhistory.TestJobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
@@ -85,4 +86,83 @@ public class TestMRTimelineEventHandling {
     }
   }
 
+  @Test
+  public void testMapreduceJobTimelineServiceEnabled()
+      throws Exception {
+    Configuration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, false);
+    MiniMRYarnCluster cluster = null;
+    try {
+      cluster = new MiniMRYarnCluster(
+          TestJobHistoryEventHandler.class.getSimpleName(), 1, true);
+      cluster.init(conf);
+      cluster.start();
+      TimelineStore ts = cluster.getApplicationHistoryServer()
+          .getTimelineStore();
+
+      Path inDir = new Path("input");
+      Path outDir = new Path("output");
+      RunningJob job =
+          UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
+      Assert.assertEquals(JobStatus.SUCCEEDED,
+          job.getJobStatus().getState().getValue());
+      TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
+          null, null, null, null, null, null);
+      Assert.assertEquals(0, entities.getEntities().size());
+
+      conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
+      job = UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
+      Assert.assertEquals(JobStatus.SUCCEEDED,
+          job.getJobStatus().getState().getValue());
+      entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
+          null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      TimelineEntity tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
+    } finally {
+      if (cluster != null) {
+        cluster.stop();
+      }
+    }
+
+    conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
+    cluster = null;
+    try {
+      cluster = new MiniMRYarnCluster(
+          TestJobHistoryEventHandler.class.getSimpleName(), 1, true);
+      cluster.init(conf);
+      cluster.start();
+      TimelineStore ts = cluster.getApplicationHistoryServer()
+          .getTimelineStore();
+
+      Path inDir = new Path("input");
+      Path outDir = new Path("output");
+
+      conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, false);
+      RunningJob job =
+          UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
+      Assert.assertEquals(JobStatus.SUCCEEDED,
+          job.getJobStatus().getState().getValue());
+      TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
+          null, null, null, null, null, null);
+      Assert.assertEquals(0, entities.getEntities().size());
+
+      conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
+      job = UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
+      Assert.assertEquals(JobStatus.SUCCEEDED,
+          job.getJobStatus().getState().getValue());
+      entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
+          null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      TimelineEntity tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
+    } finally {
+      if (cluster != null) {
+        cluster.stop();
+      }
+    }
+  }
 }