Просмотр исходного кода

MAPREDUCE-3146. Added a MR specific command line to dump logs for a given TaskAttemptID. Contributed by Siddharth Seth.
svn merge -c r1195349 --ignore-ancestry ../../trunk


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1195353 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 13 лет назад
Родитель
Сommit
4b4b1c89cc
42 измененных файлов с 938 добавлено и 107 удалено
  1. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 3 8
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
  3. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
  4. 6 5
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
  5. 14 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  6. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java
  7. 15 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
  8. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java
  9. 8 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
  10. 8 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
  11. 35 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
  12. 10 10
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
  13. 4 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
  14. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
  15. 38 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/AMInfo.java
  16. 6 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java
  17. 10 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptReport.java
  18. 201 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/AMInfoPBImpl.java
  19. 106 34
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java
  20. 80 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/TaskAttemptReportPBImpl.java
  21. 22 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java
  22. 15 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto
  23. 4 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
  24. 15 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java
  25. 15 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
  26. 38 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
  27. 67 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/v2/LogParams.java
  28. 16 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
  29. 9 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
  30. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
  31. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java
  32. 3 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
  33. 4 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java
  34. 55 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
  35. 7 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
  36. 2 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
  37. 47 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java
  38. 22 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java
  39. 1 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
  40. 27 6
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java
  41. 8 0
      hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobTracker.java
  42. 8 0
      hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LocalJobRunner.java

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -397,6 +397,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-3171. normalize nodemanager native code compilation with common/hdfs
     native. (tucu)
 
+    MAPREDUCE-3146. Added a MR specific command line to dump logs for a
+    given TaskAttemptID. (Siddharth Seth via vinodkv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and

+ 3 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -48,9 +48,9 @@ import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@@ -100,8 +100,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
@@ -130,9 +128,6 @@ public class MRAppMaster extends CompositeService {
 
   private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
 
-  private final RecordFactory recordFactory =
-    RecordFactoryProvider.getRecordFactory(null);
-
   private Clock clock;
   private final long startTime;
   private final long appSubmitTime;
@@ -758,8 +753,8 @@ public class MRAppMaster extends CompositeService {
       amInfos = new LinkedList<AMInfo>();
     }
     AMInfo amInfo =
-        new AMInfo(appAttemptID, startTime, containerID, nmHost, nmPort,
-            nmHttpPort);
+        MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
+            nmPort, nmHttpPort);
     amInfos.add(amInfo);
 
     // /////////////////// Create the job itself.

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java

@@ -23,7 +23,7 @@ import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobACL;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;

+ 6 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -48,7 +48,6 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
@@ -61,6 +60,7 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.Counter;
 import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
 import org.apache.hadoop.mapreduce.v2.api.records.Counters;
@@ -580,13 +580,14 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
 
       if (getState() == JobState.NEW) {
         return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
-            startTime, finishTime, setupProgress, 0.0f,
-            0.0f, cleanupProgress, remoteJobConfFile.toString());
+            appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
+            cleanupProgress, remoteJobConfFile.toString(), amInfos);
       }
 
       return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
-          startTime, finishTime, setupProgress, computeProgress(mapTasks),
-          computeProgress(reduceTasks), cleanupProgress, remoteJobConfFile.toString());
+          appSubmitTime, startTime, finishTime, setupProgress,
+          computeProgress(mapTasks), computeProgress(reduceTasks),
+          cleanupProgress, remoteJobConfFile.toString(), amInfos);
     } finally {
       readLock.unlock();
     }

+ 14 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -107,6 +107,7 @@ import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -158,6 +159,8 @@ public abstract class TaskAttemptImpl implements
   private long finishTime;
   private WrappedProgressSplitsBlock progressSplitBlock;
   private int shufflePort = -1;
+  private String trackerName;
+  private int httpPort;
 
   private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
     new CleanupContainerTransition();
@@ -423,7 +426,7 @@ public abstract class TaskAttemptImpl implements
     stateMachine;
 
   private ContainerId containerID;
-  private String nodeHostName;
+  private NodeId containerNodeId;
   private String containerMgrAddress;
   private String nodeHttpAddress;
   private WrappedJvmID jvmID;
@@ -763,6 +766,12 @@ public abstract class TaskAttemptImpl implements
       result.setPhase(reportedStatus.phase);
       result.setStateString(reportedStatus.stateString);
       result.setCounters(getCounters());
+      result.setContainerId(this.getAssignedContainerID());
+      result.setNodeManagerHost(trackerName);
+      result.setNodeManagerHttpPort(httpPort);
+      if (this.containerNodeId != null) {
+        result.setNodeManagerPort(this.containerNodeId.getPort());
+      }
       return result;
     } finally {
       readLock.unlock();
@@ -1001,8 +1010,8 @@ public abstract class TaskAttemptImpl implements
       final TaskAttemptContainerAssignedEvent cEvent = 
         (TaskAttemptContainerAssignedEvent) event;
       taskAttempt.containerID = cEvent.getContainer().getId();
-      taskAttempt.nodeHostName = cEvent.getContainer().getNodeId().getHost();
-      taskAttempt.containerMgrAddress = cEvent.getContainer().getNodeId()
+      taskAttempt.containerNodeId = cEvent.getContainer().getNodeId();
+      taskAttempt.containerMgrAddress = taskAttempt.containerNodeId
           .toString();
       taskAttempt.nodeHttpAddress = cEvent.getContainer().getNodeHttpAddress();
       taskAttempt.containerToken = cEvent.getContainer().getContainerToken();
@@ -1113,6 +1122,8 @@ public abstract class TaskAttemptImpl implements
       InetSocketAddress nodeHttpInetAddr =
           NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO:
                                                                   // Costly?
+      taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
+      taskAttempt.httpPort = nodeHttpInetAddr.getPort();
       JobCounterUpdateEvent jce =
           new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId()
               .getJobId());

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java

@@ -21,7 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app.recover;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.event.Dispatcher;

+ 15 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java

@@ -36,11 +36,11 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@@ -66,6 +66,7 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -158,13 +159,24 @@ public class RecoveryService extends CompositeService implements Recovery {
   public Set<TaskId> getCompletedTasks() {
     return completedTasks.keySet();
   }
-  
+
   @Override
   public List<AMInfo> getAMInfos() {
     if (jobInfo == null || jobInfo.getAMInfos() == null) {
       return new LinkedList<AMInfo>();
     }
-    return new LinkedList<AMInfo>(jobInfo.getAMInfos());
+    List<AMInfo> amInfos = new LinkedList<AMInfo>();
+    for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo
+        .getAMInfos()) {
+      AMInfo amInfo =
+          MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
+              jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
+              jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
+              jhAmInfo.getNodeManagerHttpPort());
+
+      amInfos.add(amInfo);
+    }
+    return amInfos;
   }
 
   private void parse() throws IOException {

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java

@@ -24,7 +24,7 @@ import com.google.inject.Inject;
 
 import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*;
 
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;

+ 8 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

@@ -96,6 +96,10 @@ public class MRApp extends MRAppMaster {
 
   private File testWorkDir;
   private Path testAbsPath;
+  
+  public static String NM_HOST = "localhost";
+  public static int NM_PORT = 1234;
+  public static int NM_HTTP_PORT = 9999;
 
   private static final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
@@ -136,7 +140,8 @@ public class MRApp extends MRAppMaster {
   public MRApp(int maps, int reduces, boolean autoComplete, String testName, 
       boolean cleanOnStart, int startCount) {
     super(getApplicationAttemptId(applicationId, startCount), getContainerId(
-        applicationId, startCount), "testhost", 2222, 3333, System.currentTimeMillis());
+        applicationId, startCount), NM_HOST, NM_PORT, NM_HTTP_PORT, System
+        .currentTimeMillis());
     this.testWorkDir = new File("target", testName);
     testAbsPath = new Path(testWorkDir.getAbsolutePath());
     LOG.info("PathUsed: " + testAbsPath);
@@ -363,9 +368,9 @@ public class MRApp extends MRAppMaster {
         ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
         cId.setApplicationAttemptId(getContext().getApplicationAttemptId());
         cId.setId(containerCount++);
-        NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234);
+        NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, NM_PORT);
         Container container = BuilderUtils.newContainer(cId, nodeId,
-            "localhost:9999", null, null, null);
+            NM_HOST + ":" + NM_HTTP_PORT, null, null, null);
         JobID id = TypeConverter.fromYarn(applicationId);
         JobId jobId = TypeConverter.toYarn(id);
         getContext().getEventHandler().handle(new JobHistoryEvent(jobId, 

+ 8 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java

@@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.FileSystemCounter;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.TaskCounter;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.MockApps;
@@ -88,6 +89,10 @@ public class MockJobs extends MockApps {
   static final Iterator<String> DIAGS = Iterators.cycle(
       "Error: java.lang.OutOfMemoryError: Java heap space",
       "Lost task tracker: tasktracker.domain/127.0.0.1:40879");
+  
+  public static final String NM_HOST = "localhost";
+  public static final int NM_PORT = 1234;
+  public static final int NM_HTTP_PORT = 9999;
 
   static final int DT = 1000000; // ms
 
@@ -507,8 +512,7 @@ public class MockJobs extends MockApps {
         BuilderUtils.newApplicationAttemptId(
             BuilderUtils.newApplicationId(100, 1), attempt);
     ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
-    return new AMInfo(appAttemptId, System.currentTimeMillis(), containerId,
-        "testhost", 2222, 3333);
-
+    return MRBuilderUtils.newAMInfo(appAttemptId, System.currentTimeMillis(),
+        containerId, NM_HOST, NM_PORT, NM_HTTP_PORT);
   }
 }

+ 35 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java

@@ -32,8 +32,11 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompleti
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
@@ -103,8 +106,9 @@ public class TestMRClientService {
     GetJobReportRequest gjrRequest =
         recordFactory.newRecordInstance(GetJobReportRequest.class);
     gjrRequest.setJobId(job.getID());
-    Assert.assertNotNull("JobReport is null",
-        proxy.getJobReport(gjrRequest).getJobReport());
+    JobReport jr = proxy.getJobReport(gjrRequest).getJobReport();
+    verifyJobReport(jr);
+    
 
     GetTaskAttemptCompletionEventsRequest gtaceRequest =
         recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
@@ -123,8 +127,10 @@ public class TestMRClientService {
     GetTaskAttemptReportRequest gtarRequest =
         recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class);
     gtarRequest.setTaskAttemptId(attempt.getID());
-    Assert.assertNotNull("TaskAttemptReport is null", 
-        proxy.getTaskAttemptReport(gtarRequest).getTaskAttemptReport());
+    TaskAttemptReport tar =
+        proxy.getTaskAttemptReport(gtarRequest).getTaskAttemptReport();
+    verifyTaskAttemptReport(tar);
+    
 
     GetTaskReportRequest gtrRequest =
         recordFactory.newRecordInstance(GetTaskReportRequest.class);
@@ -164,6 +170,31 @@ public class TestMRClientService {
     app.waitForState(job, JobState.SUCCEEDED);
   }
 
+  private void verifyJobReport(JobReport jr) {
+    Assert.assertNotNull("JobReport is null", jr);
+    List<AMInfo> amInfos = jr.getAMInfos();
+    Assert.assertEquals(1, amInfos.size());
+    Assert.assertEquals(JobState.RUNNING, jr.getJobState());
+    AMInfo amInfo = amInfos.get(0);
+    Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost());
+    Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
+    Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
+    Assert.assertEquals(1, amInfo.getAppAttemptId().getAttemptId());
+    Assert.assertEquals(1, amInfo.getContainerId().getApplicationAttemptId()
+        .getAttemptId());
+    Assert.assertTrue(amInfo.getStartTime() > 0);
+  }
+  
+  private void verifyTaskAttemptReport(TaskAttemptReport tar) {
+    Assert.assertEquals(TaskAttemptState.RUNNING, tar.getTaskAttemptState());
+    Assert.assertNotNull("TaskAttemptReport is null", tar);
+    Assert.assertEquals(MRApp.NM_HOST, tar.getNodeManagerHost());
+    Assert.assertEquals(MRApp.NM_PORT, tar.getNodeManagerPort());
+    Assert.assertEquals(MRApp.NM_HTTP_PORT, tar.getNodeManagerHttpPort());
+    Assert.assertEquals(1, tar.getContainerId().getApplicationAttemptId()
+        .getAttemptId());
+  }
+  
   class MRAppWithClientService extends MRApp {
     MRClientService clientService = null;
     MRAppWithClientService(int maps, int reduces, boolean autoComplete) {

+ 10 - 10
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

@@ -117,8 +117,8 @@ public class TestRMContainerAllocator {
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
-        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
-            0, 0, 0, 0, 0, 0, "jobfile"));
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 
+            0, 0, 0, 0, 0, 0, "jobfile", null));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
 
@@ -194,8 +194,8 @@ public class TestRMContainerAllocator {
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
-        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
-            0, 0, 0, 0, 0, 0, "jobfile"));
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
 
@@ -260,8 +260,8 @@ public class TestRMContainerAllocator {
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
-        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
-            0, 0, 0, 0, 0, 0, "jobfile"));
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
 
@@ -374,8 +374,8 @@ public class TestRMContainerAllocator {
     @Override
     public JobReport getReport() {
       return MRBuilderUtils.newJobReport(this.jobId, "job", "user",
-          JobState.RUNNING, 0, 0, this.setupProgress, this.mapProgress,
-          this.reduceProgress, this.cleanupProgress, "jobfile");
+          JobState.RUNNING, 0, 0, 0, this.setupProgress, this.mapProgress,
+          this.reduceProgress, this.cleanupProgress, "jobfile", null);
     }
   }
 
@@ -510,8 +510,8 @@ public class TestRMContainerAllocator {
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
-        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
-            0, 0, 0, 0, 0, 0, "jobfile"));
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
 

+ 4 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java

@@ -39,10 +39,10 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
@@ -221,9 +221,9 @@ public class TestRecovery {
           .getAttemptId());
       Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
           .getApplicationAttemptId());
-      Assert.assertEquals("testhost", amInfo.getNodeManagerHost());
-      Assert.assertEquals(2222, amInfo.getNodeManagerPort());
-      Assert.assertEquals(3333, amInfo.getNodeManagerHttpPort());
+      Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost());
+      Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
+      Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
     }
     long am1StartTimeReal = job.getAMInfos().get(0).getStartTime();
     long am2StartTimeReal = job.getAMInfos().get(1).getStartTime();

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java

@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobACL;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;

+ 38 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/AMInfo.java

@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.api.records;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public interface AMInfo {
+  public ApplicationAttemptId getAppAttemptId();
+  public long getStartTime();
+  public ContainerId getContainerId();
+  public String getNodeManagerHost();
+  public int getNodeManagerPort();
+  public int getNodeManagerHttpPort();
+
+  public void setAppAttemptId(ApplicationAttemptId appAttemptId);
+  public void setStartTime(long startTime);
+  public void setContainerId(ContainerId containerId);
+  public void setNodeManagerHost(String nmHost);
+  public void setNodeManagerPort(int nmPort);
+  public void setNodeManagerHttpPort(int mnHttpPort);
+}

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce.v2.api.records;
 
+import java.util.List;
+
 public interface JobReport {
   public abstract JobId getJobId();
   public abstract JobState getJobState();
@@ -25,6 +27,7 @@ public interface JobReport {
   public abstract float getReduceProgress();
   public abstract float getCleanupProgress();
   public abstract float getSetupProgress();
+  public abstract long getSubmitTime();
   public abstract long getStartTime();
   public abstract long getFinishTime();
   public abstract String getUser();
@@ -32,6 +35,7 @@ public interface JobReport {
   public abstract String getTrackingUrl();
   public abstract String getDiagnostics();
   public abstract String getJobFile();
+  public abstract List<AMInfo> getAMInfos();
 
   public abstract void setJobId(JobId jobId);
   public abstract void setJobState(JobState jobState);
@@ -39,6 +43,7 @@ public interface JobReport {
   public abstract void setReduceProgress(float progress);
   public abstract void setCleanupProgress(float progress);
   public abstract void setSetupProgress(float progress);
+  public abstract void setSubmitTime(long submitTime);
   public abstract void setStartTime(long startTime);
   public abstract void setFinishTime(long finishTime);
   public abstract void setUser(String user);
@@ -46,4 +51,5 @@ public interface JobReport {
   public abstract void setTrackingUrl(String trackingUrl);
   public abstract void setDiagnostics(String diagnostics);
   public abstract void setJobFile(String jobFile);
+  public abstract void setAMInfos(List<AMInfo> amInfos);
 }

+ 10 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptReport.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce.v2.api.records;
 
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
 public interface TaskAttemptReport {
   public abstract TaskAttemptId getTaskAttemptId();
   public abstract TaskAttemptState getTaskAttemptState();
@@ -32,6 +34,10 @@ public interface TaskAttemptReport {
   public abstract String getDiagnosticInfo();
   public abstract String getStateString();
   public abstract Phase getPhase();
+  public abstract String getNodeManagerHost();
+  public abstract int getNodeManagerPort();
+  public abstract int getNodeManagerHttpPort();
+  public abstract ContainerId getContainerId();
 
   public abstract void setTaskAttemptId(TaskAttemptId taskAttemptId);
   public abstract void setTaskAttemptState(TaskAttemptState taskAttemptState);
@@ -42,6 +48,10 @@ public interface TaskAttemptReport {
   public abstract void setDiagnosticInfo(String diagnosticInfo);
   public abstract void setStateString(String stateString);
   public abstract void setPhase(Phase phase);
+  public abstract void setNodeManagerHost(String nmHost);
+  public abstract void setNodeManagerPort(int nmPort);
+  public abstract void setNodeManagerHttpPort(int nmHttpPort);
+  public abstract void setContainerId(ContainerId containerId);
   
   /** 
    * Set the shuffle finish time. Applicable only for reduce attempts

+ 201 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/AMInfoPBImpl.java

@@ -0,0 +1,201 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.api.records.impl.pb;
+
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.proto.MRProtos.AMInfoProto;
+import org.apache.hadoop.mapreduce.v2.proto.MRProtos.AMInfoProtoOrBuilder;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+
+public class AMInfoPBImpl extends ProtoBase<AMInfoProto> implements AMInfo {
+
+  AMInfoProto proto = AMInfoProto.getDefaultInstance();
+  AMInfoProto.Builder builder = null;
+  boolean viaProto = false;
+  
+  private ApplicationAttemptId appAttemptId;
+  private ContainerId containerId;
+  
+  public AMInfoPBImpl() {
+    builder = AMInfoProto.newBuilder();
+  }
+
+  public AMInfoPBImpl(AMInfoProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public synchronized AMInfoProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private synchronized void mergeLocalToBuilder() {
+    if (this.appAttemptId != null
+        && !((ApplicationAttemptIdPBImpl) this.appAttemptId).getProto().equals(
+            builder.getApplicationAttemptId())) {
+      builder.setApplicationAttemptId(convertToProtoFormat(this.appAttemptId));
+    }
+    if (this.getContainerId() != null
+        && !((ContainerIdPBImpl) this.containerId).getProto().equals(
+            builder.getContainerId())) {
+      builder.setContainerId(convertToProtoFormat(this.containerId));
+    }
+  }
+
+  private synchronized void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private synchronized void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = AMInfoProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public synchronized ApplicationAttemptId getAppAttemptId() {
+    AMInfoProtoOrBuilder p = viaProto ? proto : builder;
+    if (appAttemptId != null) {
+      return appAttemptId;
+    } // Else via proto
+    if (!p.hasApplicationAttemptId()) {
+      return null;
+    }
+    appAttemptId = convertFromProtoFormat(p.getApplicationAttemptId());
+    return appAttemptId;
+  }
+
+  @Override
+  public synchronized void setAppAttemptId(ApplicationAttemptId appAttemptId) {
+    maybeInitBuilder();
+    if (appAttemptId == null) {
+      builder.clearApplicationAttemptId();
+    }
+    this.appAttemptId = appAttemptId;
+  }
+
+  @Override
+  public synchronized long getStartTime() {
+    AMInfoProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.getStartTime());
+  }
+  
+  @Override
+  public synchronized void setStartTime(long startTime) {
+    maybeInitBuilder();
+    builder.setStartTime(startTime);
+  }
+
+  @Override
+  public synchronized ContainerId getContainerId() {
+    AMInfoProtoOrBuilder p = viaProto ? proto : builder;
+    if (containerId != null) {
+      return containerId;
+    } // Else via proto
+    if (!p.hasContainerId()) {
+      return null;
+    }
+    containerId = convertFromProtoFormat(p.getContainerId());
+    return containerId;
+  }
+  
+  @Override
+  public synchronized void setContainerId(ContainerId containerId) {
+    maybeInitBuilder();
+    if (containerId == null) {
+      builder.clearContainerId();
+    }
+    this.containerId = containerId;
+  }
+
+  @Override
+  public synchronized String getNodeManagerHost() {
+    AMInfoProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasNodeManagerHost()) {
+      return null;
+    }
+    return p.getNodeManagerHost();
+  }
+
+  @Override
+  public synchronized void setNodeManagerHost(String nmHost) {
+    maybeInitBuilder();
+    if (nmHost == null) {
+      builder.clearNodeManagerHost();
+      return;
+    }
+    builder.setNodeManagerHost(nmHost);
+  }
+
+  @Override
+  public synchronized int getNodeManagerPort() {
+    AMInfoProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.getNodeManagerPort());
+  }
+  
+  @Override
+  public synchronized void setNodeManagerPort(int nmPort) {
+    maybeInitBuilder();
+    builder.setNodeManagerPort(nmPort);
+  }
+
+  @Override
+  public synchronized int getNodeManagerHttpPort() {
+    AMInfoProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getNodeManagerHttpPort();
+  }
+
+  @Override
+  public synchronized void setNodeManagerHttpPort(int httpPort) {
+    maybeInitBuilder();
+    builder.setNodeManagerHttpPort(httpPort);
+  }
+
+  private ApplicationAttemptIdPBImpl convertFromProtoFormat(
+      ApplicationAttemptIdProto p) {
+    return new ApplicationAttemptIdPBImpl(p);
+  }
+
+  private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
+    return new ContainerIdPBImpl(p);
+  }
+
+  private
+      ApplicationAttemptIdProto convertToProtoFormat(ApplicationAttemptId t) {
+    return ((ApplicationAttemptIdPBImpl) t).getProto();
+  }
+
+  private ContainerIdProto convertToProtoFormat(ContainerId t) {
+    return ((ContainerIdPBImpl) t).getProto();
+  }
+}

+ 106 - 34
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java

@@ -19,9 +19,14 @@
 package org.apache.hadoop.mapreduce.v2.api.records.impl.pb;
 
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.proto.MRProtos.AMInfoProto;
 import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobIdProto;
 import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobReportProto;
 import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobReportProtoOrBuilder;
@@ -31,12 +36,14 @@ import org.apache.hadoop.yarn.api.records.ProtoBase;
 
 
     
-public class JobReportPBImpl extends ProtoBase<JobReportProto> implements JobReport {
+public class JobReportPBImpl extends ProtoBase<JobReportProto> implements
+    JobReport {
   JobReportProto proto = JobReportProto.getDefaultInstance();
   JobReportProto.Builder builder = null;
   boolean viaProto = false;
   
   private JobId jobId = null;
+  private List<AMInfo> amInfos = null;
   
   
   public JobReportPBImpl() {
@@ -48,20 +55,23 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements JobRep
     viaProto = true;
   }
   
-  public JobReportProto getProto() {
+  public synchronized JobReportProto getProto() {
       mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
     viaProto = true;
     return proto;
   }
 
-  private void mergeLocalToBuilder() {
+  private synchronized void mergeLocalToBuilder() {
     if (this.jobId != null) {
       builder.setJobId(convertToProtoFormat(this.jobId));
     }
+    if (this.amInfos != null) {
+      addAMInfosToProto();
+    }
   }
 
-  private void mergeLocalToProto() {
+  private synchronized void mergeLocalToProto() {
     if (viaProto) 
       maybeInitBuilder();
     mergeLocalToBuilder();
@@ -69,7 +79,7 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements JobRep
     viaProto = true;
   }
 
-  private void maybeInitBuilder() {
+  private synchronized void maybeInitBuilder() {
     if (viaProto || builder == null) {
       builder = JobReportProto.newBuilder(proto);
     }
@@ -78,7 +88,7 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements JobRep
     
   
   @Override
-  public JobId getJobId() {
+  public synchronized JobId getJobId() {
     JobReportProtoOrBuilder p = viaProto ? proto : builder;
     if (this.jobId != null) {
       return this.jobId;
@@ -91,14 +101,14 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements JobRep
   }
 
   @Override
-  public void setJobId(JobId jobId) {
+  public synchronized void setJobId(JobId jobId) {
     maybeInitBuilder();
     if (jobId == null) 
       builder.clearJobId();
     this.jobId = jobId;
   }
   @Override
-  public JobState getJobState() {
+  public synchronized JobState getJobState() {
     JobReportProtoOrBuilder p = viaProto ? proto : builder;
     if (!p.hasJobState()) {
       return null;
@@ -107,7 +117,7 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements JobRep
   }
 
   @Override
-  public void setJobState(JobState jobState) {
+  public synchronized void setJobState(JobState jobState) {
     maybeInitBuilder();
     if (jobState == null) {
       builder.clearJobState();
@@ -116,132 +126,197 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements JobRep
     builder.setJobState(convertToProtoFormat(jobState));
   }
   @Override
-  public float getMapProgress() {
+  public synchronized float getMapProgress() {
     JobReportProtoOrBuilder p = viaProto ? proto : builder;
     return (p.getMapProgress());
   }
 
   @Override
-  public void setMapProgress(float mapProgress) {
+  public synchronized void setMapProgress(float mapProgress) {
     maybeInitBuilder();
     builder.setMapProgress((mapProgress));
   }
   @Override
-  public float getReduceProgress() {
+  public synchronized float getReduceProgress() {
     JobReportProtoOrBuilder p = viaProto ? proto : builder;
     return (p.getReduceProgress());
   }
 
   @Override
-  public void setReduceProgress(float reduceProgress) {
+  public synchronized void setReduceProgress(float reduceProgress) {
     maybeInitBuilder();
     builder.setReduceProgress((reduceProgress));
   }
   @Override
-  public float getCleanupProgress() {
+  public synchronized float getCleanupProgress() {
     JobReportProtoOrBuilder p = viaProto ? proto : builder;
     return (p.getCleanupProgress());
   }
 
   @Override
-  public void setCleanupProgress(float cleanupProgress) {
+  public synchronized void setCleanupProgress(float cleanupProgress) {
     maybeInitBuilder();
     builder.setCleanupProgress((cleanupProgress));
   }
   @Override
-  public float getSetupProgress() {
+  public synchronized float getSetupProgress() {
     JobReportProtoOrBuilder p = viaProto ? proto : builder;
     return (p.getSetupProgress());
   }
 
   @Override
-  public void setSetupProgress(float setupProgress) {
+  public synchronized void setSetupProgress(float setupProgress) {
     maybeInitBuilder();
     builder.setSetupProgress((setupProgress));
   }
+
+  @Override
+  public synchronized long getSubmitTime() {
+    JobReportProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.getSubmitTime());
+  }
+
+  @Override
+  public synchronized void setSubmitTime(long submitTime) {
+    maybeInitBuilder();
+    builder.setSubmitTime((submitTime));
+  }
+
   @Override
-  public long getStartTime() {
+  public synchronized long getStartTime() {
     JobReportProtoOrBuilder p = viaProto ? proto : builder;
     return (p.getStartTime());
   }
 
   @Override
-  public void setStartTime(long startTime) {
+  public synchronized void setStartTime(long startTime) {
     maybeInitBuilder();
     builder.setStartTime((startTime));
   }
   @Override
-  public long getFinishTime() {
+  public synchronized long getFinishTime() {
     JobReportProtoOrBuilder p = viaProto ? proto : builder;
     return (p.getFinishTime());
   }
 
   @Override
-  public void setFinishTime(long finishTime) {
+  public synchronized void setFinishTime(long finishTime) {
     maybeInitBuilder();
     builder.setFinishTime((finishTime));
   }
 
   @Override
-  public String getUser() {
+  public synchronized String getUser() {
     JobReportProtoOrBuilder p = viaProto ? proto : builder;
     return (p.getUser());
   }
 
   @Override
-  public void setUser(String user) {
+  public synchronized void setUser(String user) {
     maybeInitBuilder();
     builder.setUser((user));
   }
 
   @Override
-  public String getJobName() {
+  public synchronized String getJobName() {
     JobReportProtoOrBuilder p = viaProto ? proto : builder;
     return (p.getJobName());
   }
 
   @Override
-  public void setJobName(String jobName) {
+  public synchronized void setJobName(String jobName) {
     maybeInitBuilder();
     builder.setJobName((jobName));
   }
 
   @Override
-  public String getTrackingUrl() {
+  public synchronized String getTrackingUrl() {
     JobReportProtoOrBuilder p = viaProto ? proto : builder;
     return (p.getTrackingUrl());
   }
 
   @Override
-  public void setTrackingUrl(String trackingUrl) {
+  public synchronized void setTrackingUrl(String trackingUrl) {
     maybeInitBuilder();
     builder.setTrackingUrl(trackingUrl);
   }
 
   @Override
-  public String getDiagnostics() {
+  public synchronized String getDiagnostics() {
     JobReportProtoOrBuilder p = viaProto ? proto : builder;
     return p.getDiagnostics();
   }
 
   @Override
-  public void setDiagnostics(String diagnostics) {
+  public synchronized void setDiagnostics(String diagnostics) {
     maybeInitBuilder();
     builder.setDiagnostics(diagnostics);
   }
   
   @Override
-  public String getJobFile() {
+  public synchronized String getJobFile() {
     JobReportProtoOrBuilder p = viaProto ? proto : builder;
     return p.getJobFile();
   }
 
   @Override
-  public void setJobFile(String jobFile) {
+  public synchronized void setJobFile(String jobFile) {
     maybeInitBuilder();
     builder.setJobFile(jobFile);
   }
   
+  @Override
+  public synchronized List<AMInfo> getAMInfos() {
+    initAMInfos();
+    return this.amInfos;
+  }
+  
+  @Override
+  public synchronized void setAMInfos(List<AMInfo> amInfos) {
+    maybeInitBuilder();
+    if (amInfos == null) {
+      this.builder.clearAmInfos();
+      this.amInfos = null;
+      return;
+    }
+    initAMInfos();
+    this.amInfos.clear();
+    this.amInfos.addAll(amInfos);
+  }
+  
+  
+  private synchronized void initAMInfos() {
+    if (this.amInfos != null) {
+      return;
+    }
+    JobReportProtoOrBuilder p = viaProto ? proto : builder;
+    List<AMInfoProto> list = p.getAmInfosList();
+    
+    this.amInfos = new ArrayList<AMInfo>();
+
+    for (AMInfoProto amInfoProto : list) {
+      this.amInfos.add(convertFromProtoFormat(amInfoProto));
+    }
+  }
+
+  private synchronized void addAMInfosToProto() {
+    maybeInitBuilder();
+    builder.clearAmInfos();
+    if (this.amInfos == null)
+      return;
+    for (AMInfo amInfo : this.amInfos) {
+      builder.addAmInfos(convertToProtoFormat(amInfo));
+    }
+  }
+
+  private AMInfoPBImpl convertFromProtoFormat(AMInfoProto p) {
+    return new AMInfoPBImpl(p);
+  }
+
+  private AMInfoProto convertToProtoFormat(AMInfo t) {
+    return ((AMInfoPBImpl)t).getProto();
+  }
+
   private JobIdPBImpl convertFromProtoFormat(JobIdProto p) {
     return new JobIdPBImpl(p);
   }
@@ -257,7 +332,4 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements JobRep
   private JobState convertFromProtoFormat(JobStateProto e) {
     return MRProtoUtils.convertFromProtoFormat(e);
   }
-
-
-
 }  

+ 80 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/TaskAttemptReportPBImpl.java

@@ -31,7 +31,10 @@ import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskAttemptReportProto;
 import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskAttemptReportProtoOrBuilder;
 import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskAttemptStateProto;
 import org.apache.hadoop.mapreduce.v2.util.MRProtoUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
 
 
     
@@ -42,6 +45,7 @@ public class TaskAttemptReportPBImpl extends ProtoBase<TaskAttemptReportProto> i
   
   private TaskAttemptId taskAttemptId = null;
   private Counters counters = null;
+  private ContainerId containerId = null;
   
   
   public TaskAttemptReportPBImpl() {
@@ -67,6 +71,9 @@ public class TaskAttemptReportPBImpl extends ProtoBase<TaskAttemptReportProto> i
     if (this.counters != null) {
       builder.setCounters(convertToProtoFormat(this.counters));
     }
+    if (this.containerId != null) {
+      builder.setContainerId(convertToProtoFormat(this.containerId));
+    }
   }
 
   private void mergeLocalToProto() {
@@ -255,7 +262,80 @@ public class TaskAttemptReportPBImpl extends ProtoBase<TaskAttemptReportProto> i
     }
     builder.setPhase(convertToProtoFormat(phase));
   }
+  
+  @Override
+  public String getNodeManagerHost() {
+    TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasNodeManagerHost()) {
+      return null;
+    }
+    return p.getNodeManagerHost();
+  }
+  
+  @Override
+  public void setNodeManagerHost(String nmHost) {
+    maybeInitBuilder();
+    if (nmHost == null) {
+      builder.clearNodeManagerHost();
+      return;
+    }
+    builder.setNodeManagerHost(nmHost);
+  }
+  
+  @Override
+  public int getNodeManagerPort() {
+    TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.getNodeManagerPort());
+  }
+  
+  @Override
+  public void setNodeManagerPort(int nmPort) {
+    maybeInitBuilder();
+    builder.setNodeManagerPort(nmPort);
+  }
+  
+  @Override
+  public int getNodeManagerHttpPort() {
+    TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.getNodeManagerHttpPort());
+  }
+  
+  @Override
+  public void setNodeManagerHttpPort(int nmHttpPort) {
+    maybeInitBuilder();
+    builder.setNodeManagerHttpPort(nmHttpPort);
+  }
+  
+  @Override
+  public ContainerId getContainerId() {
+    TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
+    if (containerId != null) {
+      return containerId;
+    } // Else via proto
+    if (!p.hasContainerId()) {
+      return null;
+    }
+    containerId = convertFromProtoFormat(p.getContainerId());
+    return containerId;
+  }
 
+  @Override
+  public void setContainerId(ContainerId containerId) {
+    maybeInitBuilder();
+    if (containerId == null) {
+      builder.clearContainerId();
+    }
+    this.containerId = containerId;
+  }
+
+  private ContainerIdProto convertToProtoFormat(ContainerId t) {
+    return ((ContainerIdPBImpl)t).getProto();
+  }
+  
+  private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
+    return new ContainerIdPBImpl(p);
+  }
+  
   private CountersPBImpl convertFromProtoFormat(CountersProto p) {
     return new CountersPBImpl(p);
   }

+ 22 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java

@@ -18,13 +18,18 @@
 
 package org.apache.hadoop.mapreduce.v2.util;
 
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.Records;
 
 public class MRBuilderUtils {
@@ -53,14 +58,15 @@ public class MRBuilderUtils {
   }
 
   public static JobReport newJobReport(JobId jobId, String jobName,
-      String userName, JobState state, long startTime, long finishTime,
+      String userName, JobState state, long submitTime, long startTime, long finishTime,
       float setupProgress, float mapProgress, float reduceProgress,
-      float cleanupProgress, String jobFile) {
+      float cleanupProgress, String jobFile, List<AMInfo> amInfos) {
     JobReport report = Records.newRecord(JobReport.class);
     report.setJobId(jobId);
     report.setJobName(jobName);
     report.setUser(userName);
     report.setJobState(state);
+    report.setSubmitTime(submitTime);
     report.setStartTime(startTime);
     report.setFinishTime(finishTime);
     report.setSetupProgress(setupProgress);
@@ -68,6 +74,20 @@ public class MRBuilderUtils {
     report.setMapProgress(mapProgress);
     report.setReduceProgress(reduceProgress);
     report.setJobFile(jobFile);
+    report.setAMInfos(amInfos);
     return report;
   }
+
+  public static AMInfo newAMInfo(ApplicationAttemptId appAttemptId,
+      long startTime, ContainerId containerId, String nmHost, int nmPort,
+      int nmHttpPort) {
+    AMInfo amInfo = Records.newRecord(AMInfo.class);
+    amInfo.setAppAttemptId(appAttemptId);
+    amInfo.setStartTime(startTime);
+    amInfo.setContainerId(containerId);
+    amInfo.setNodeManagerHost(nmHost);
+    amInfo.setNodeManagerPort(nmPort);
+    amInfo.setNodeManagerHttpPort(nmHttpPort);
+    return amInfo;
+  }
 }

+ 15 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto

@@ -119,6 +119,10 @@ message TaskAttemptReportProto {
   optional PhaseProto phase = 9;
   optional int64 shuffle_finish_time = 10;
   optional int64 sort_finish_time=11;
+  optional string node_manager_host = 12;
+  optional int32 node_manager_port = 13;
+  optional int32 node_manager_http_port = 14;
+  optional ContainerIdProto container_id = 15;
 }
 
 enum JobStateProto {
@@ -146,6 +150,17 @@ message JobReportProto {
   optional string trackingUrl = 11;
   optional string diagnostics = 12;
   optional string jobFile = 13;
+  repeated AMInfoProto am_infos = 14;
+  optional int64 submit_time = 15;
+}
+
+message AMInfoProto {
+  optional ApplicationAttemptIdProto application_attempt_id = 1;
+  optional int64 start_time = 2;
+  optional ContainerIdProto container_id = 3;
+  optional string node_manager_host = 4;
+  optional int32 node_manager_port = 5;
+  optional int32 node_manager_http_port = 6;
 }
 
 enum TaskAttemptCompletionEventStatusProto {

+ 4 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml

@@ -34,6 +34,10 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-common</artifactId> 
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-nodemanager</artifactId> 
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs</artifactId>

+ 15 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java

@@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
+import org.apache.hadoop.mapreduce.v2.LogParams;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -212,7 +213,20 @@ public class Cluster {
       throws IOException, InterruptedException {
     return client.getQueue(name);
   }
-  
+
+  /**
+   * Get log parameters for the specified jobID or taskAttemptID
+   * @param jobID the job id.
+   * @param taskAttemptID the task attempt id. Optional.
+   * @return the LogParams
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID)
+      throws IOException, InterruptedException {
+    return client.getLogFileParams(jobID, taskAttemptID);
+  }
+
   /**
    * Get current cluster status.
    * 

+ 15 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.v2.LogParams;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.authorize.AccessControlList;
@@ -115,6 +116,8 @@ public interface ClientProtocol extends VersionedProtocol {
    *             MAPREDUCE-2337.
    * Version 37: More efficient serialization format for framework counters
    *             (MAPREDUCE-901)
+   * Version 38: Added getLogFilePath(JobID, TaskAttemptID) as part of 
+   *             MAPREDUCE-3146
    */
   public static final long versionID = 37L;
 
@@ -351,4 +354,16 @@ public interface ClientProtocol extends VersionedProtocol {
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
                                     ) throws IOException,
                                              InterruptedException;
+  
+  /**
+   * Gets the location of the log file for a job if no taskAttemptId is
+   * specified, otherwise gets the log location for the taskAttemptId.
+   * @param jobID the jobId.
+   * @param taskAttemptID the taskAttemptId.
+   * @return log params.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)
+      throws IOException, InterruptedException;
 }

+ 38 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java

@@ -42,9 +42,11 @@ import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
+import org.apache.hadoop.mapreduce.v2.LogParams;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogDumper;
 
 /**
  * Interprets the map reduce cli options 
@@ -95,6 +97,7 @@ public class CLI extends Configured implements Tool {
     boolean killTask = false;
     boolean failTask = false;
     boolean setJobPriority = false;
+    boolean logs = false;
 
     if ("-submit".equals(cmd)) {
       if (argv.length != 2) {
@@ -205,6 +208,19 @@ public class CLI extends Configured implements Tool {
       taskType = argv[2];
       taskState = argv[3];
       displayTasks = true;
+    } else if ("-logs".equals(cmd)) {
+      if (argv.length == 2 || argv.length ==3) {
+        logs = true;
+        jobid = argv[1];
+        if (argv.length == 3) {
+          taskid = argv[2];
+        }  else {
+          taskid = null;
+        }
+      } else {
+        displayUsage(cmd);
+        return exitCode;
+      }
     } else {
       displayUsage(cmd);
       return exitCode;
@@ -313,6 +329,22 @@ public class CLI extends Configured implements Tool {
           System.out.println("Could not fail task " + taskid);
           exitCode = -1;
         }
+      } else if (logs) {
+        try {
+        JobID jobID = JobID.forName(jobid);
+        TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid);
+        LogParams logParams = cluster.getLogParams(jobID, taskAttemptID);
+        LogDumper logDumper = new LogDumper();
+        logDumper.setConf(getConf());
+        logDumper.dumpAContainersLogs(logParams.getApplicationId(),
+            logParams.getContainerId(), logParams.getNodeId(),
+            logParams.getOwner());
+        } catch (IOException e) {
+          if (e instanceof RemoteException) {
+            throw e;
+          } 
+          System.out.println(e.getMessage());
+        }
       }
     } catch (RemoteException re) {
       IOException unwrappedException = re.unwrapRemoteException();
@@ -380,6 +412,10 @@ public class CLI extends Configured implements Tool {
           " <job-id> <task-type> <task-state>]. " +
           "Valid values for <task-type> are " + taskTypes + ". " +
           "Valid values for <task-state> are " + taskStates);
+    } else if ("-logs".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd +
+          " <job-id> <task-attempt-id>]. " +
+          " <task-attempt-id> is optional to get task attempt logs.");      
     } else {
       System.err.printf(prefix + "<command> <args>\n");
       System.err.printf("\t[-submit <job-file>]\n");
@@ -398,7 +434,8 @@ public class CLI extends Configured implements Tool {
         "Valid values for <task-type> are " + taskTypes + ". " +
         "Valid values for <task-state> are " + taskStates);
       System.err.printf("\t[-kill-task <task-attempt-id>]\n");
-      System.err.printf("\t[-fail-task <task-attempt-id>]\n\n");
+      System.err.printf("\t[-fail-task <task-attempt-id>]\n");
+      System.err.printf("\t[-logs <job-id> <task-attempt-id>]\n\n");
       ToolRunner.printGenericCommandUsage(System.out);
     }
   }

+ 67 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/v2/LogParams.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2;
+
+public class LogParams {
+
+  private String containerId;
+  private String applicationId;
+  private String nodeId;
+  private String owner;
+
+  public LogParams(String containerIdStr, String applicationIdStr,
+      String nodeIdStr, String owner) {
+    this.containerId = containerIdStr;
+    this.applicationId = applicationIdStr;
+    this.nodeId = nodeIdStr;
+    this.owner = owner;
+  }
+
+  public String getContainerId() {
+    return containerId;
+  }
+
+  public void setContainerId(String containerId) {
+    this.containerId = containerId;
+  }
+
+  public String getApplicationId() {
+    return applicationId;
+  }
+
+  public void setApplicationId(String applicationId) {
+    this.applicationId = applicationId;
+  }
+
+  public String getNodeId() {
+    return nodeId;
+  }
+
+  public void setNodeId(String nodeId) {
+    this.nodeId = nodeId;
+  }
+
+  public String getOwner() {
+    return this.owner;
+  }
+
+  public String setOwner(String owner) {
+    return this.owner;
+  }
+}

+ 16 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java

@@ -36,8 +36,8 @@ import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.YarnException;
@@ -94,6 +95,7 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
             JobReport.class);
     report.setJobId(jobId);
     report.setJobState(JobState.valueOf(jobInfo.getJobStatus()));
+    report.setSubmitTime(jobInfo.getSubmitTime());
     report.setStartTime(jobInfo.getLaunchTime());
     report.setFinishTime(jobInfo.getFinishTime());
     report.setJobName(jobInfo.getJobname());
@@ -103,6 +105,7 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
     report.setJobFile(confFile.toString());
     report.setTrackingUrl(JobHistoryUtils.getHistoryUrl(conf, TypeConverter
         .toYarn(TypeConverter.fromYarn(jobId)).getAppId()));
+    report.setAMInfos(getAMInfos());
   }
 
   @Override
@@ -341,6 +344,17 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
 
   @Override
   public List<AMInfo> getAMInfos() {
-    return jobInfo.getAMInfos();
+    List<AMInfo> amInfos = new LinkedList<AMInfo>();
+    for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo
+        .getAMInfos()) {
+      AMInfo amInfo =
+          MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
+              jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
+              jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
+              jhAmInfo.getNodeManagerHttpPort());
+   
+      amInfos.add(amInfo);
+    }
+    return amInfos;
   }
 }

+ 9 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java

@@ -79,6 +79,15 @@ public class CompletedTaskAttempt implements TaskAttempt {
 //    report.setPhase(attemptInfo.get); //TODO
     report.setStateString(attemptInfo.getState());
     report.setCounters(getCounters());
+    report.setContainerId(attemptInfo.getContainerId());
+    String []hostSplits = attemptInfo.getHostname().split(":");
+    if (hostSplits.length != 2) {
+      report.setNodeManagerHost("UNKNOWN");
+    } else {
+      report.setNodeManagerHost(hostSplits[0]);
+      report.setNodeManagerPort(Integer.parseInt(hostSplits[1]));
+    }
+    report.setNodeManagerHttpPort(attemptInfo.getHttpPort());
   }
 
   @Override

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java

@@ -23,7 +23,7 @@ import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobACL;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java

@@ -24,7 +24,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.mapreduce.JobACL;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;

+ 3 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java

@@ -112,11 +112,11 @@ public class TestJobHistoryParsing {
 
     // Verify aminfo
     Assert.assertEquals(1, jobInfo.getAMInfos().size());
-    Assert.assertEquals("testhost", jobInfo.getAMInfos().get(0)
+    Assert.assertEquals(MRApp.NM_HOST, jobInfo.getAMInfos().get(0)
         .getNodeManagerHost());
     AMInfo amInfo = jobInfo.getAMInfos().get(0);
-    Assert.assertEquals(2222, amInfo.getNodeManagerPort());
-    Assert.assertEquals(3333, amInfo.getNodeManagerHttpPort());
+    Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
+    Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
     Assert.assertEquals(1, amInfo.getAppAttemptId().getAttemptId());
     Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
         .getApplicationAttemptId());

+ 4 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java

@@ -218,7 +218,8 @@ public class TestHSWebApp {
 
     params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1)
         .toString());
-    params.put(NM_NODENAME, BuilderUtils.newNodeId("testhost", 2222).toString());
+    params.put(NM_NODENAME, 
+        BuilderUtils.newNodeId(MockJobs.NM_HOST, MockJobs.NM_PORT).toString());
     params.put(ENTITY_STRING, "container_10_0001_01_000001");
     params.put(APP_OWNER, "owner");
 
@@ -229,7 +230,8 @@ public class TestHSWebApp {
     verify(spyPw).write(
         "Logs not available for container_10_0001_01_000001. Aggregation "
             + "may not be complete,"
-            + " Check back later or try the nodemanager on testhost:2222");
+            + " Check back later or try the nodemanager on "
+            + MockJobs.NM_HOST + ":" + MockJobs.NM_PORT);
   }
 }
   

+ 55 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java

@@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 
@@ -37,6 +38,7 @@ import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.LogParams;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@@ -47,13 +49,17 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -68,6 +74,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 
 public class ClientServiceDelegate {
   private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
@@ -398,5 +405,52 @@ public class ClientServiceDelegate {
     return true;
   }
 
+  public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
+      throws YarnRemoteException, IOException {
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
+        TypeConverter.toYarn(oldJobID);
+    GetJobReportRequest request =
+        recordFactory.newRecordInstance(GetJobReportRequest.class);
+    request.setJobId(jobId);
 
-}
+    JobReport report =
+        ((GetJobReportResponse) invoke("getJobReport",
+            GetJobReportRequest.class, request)).getJobReport();
+    if (EnumSet.of(JobState.SUCCEEDED, JobState.FAILED, JobState.KILLED,
+        JobState.ERROR).contains(report.getJobState())) {
+      if (oldTaskAttemptID != null) {
+        GetTaskAttemptReportRequest taRequest =
+            recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class);
+        taRequest.setTaskAttemptId(TypeConverter.toYarn(oldTaskAttemptID));
+        TaskAttemptReport taReport =
+            ((GetTaskAttemptReportResponse) invoke("getTaskAttemptReport",
+                GetTaskAttemptReportRequest.class, taRequest))
+                .getTaskAttemptReport();
+        if (taReport.getContainerId() == null
+            || taReport.getNodeManagerHost() == null) {
+          throw new IOException("Unable to get log information for task: "
+              + oldTaskAttemptID);
+        }
+        return new LogParams(
+            taReport.getContainerId().toString(),
+            taReport.getContainerId().getApplicationAttemptId()
+                .getApplicationId().toString(),
+            BuilderUtils.newNodeId(taReport.getNodeManagerHost(),
+                taReport.getNodeManagerPort()).toString(), report.getUser());
+      } else {
+        if (report.getAMInfos() == null || report.getAMInfos().size() == 0) {
+          throw new IOException("Unable to get log information for job: "
+              + oldJobID);
+        }
+        AMInfo amInfo = report.getAMInfos().get(report.getAMInfos().size() - 1);
+        return new LogParams(
+            amInfo.getContainerId().toString(),
+            amInfo.getAppAttemptId().getApplicationId().toString(),
+            BuilderUtils.newNodeId(amInfo.getNodeManagerHost(),
+                amInfo.getNodeManagerPort()).toString(), report.getUser());
+      }
+    } else {
+      throw new IOException("Cannot get log path for a in-progress job");
+    }
+  }
+}

+ 7 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java

@@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.LogParams;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.Credentials;
@@ -504,4 +505,10 @@ public class YARNRunner implements ClientProtocol {
     return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion,
         clientMethodsHash);
   }
+
+  @Override
+  public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)
+      throws IOException {
+    return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);
+  }
 }

+ 2 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java

@@ -168,7 +168,7 @@ public class TestClientServiceDelegate {
     GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class);
     when(jobReportResponse1.getJobReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "jobName-firstGen", "user",
-            JobState.RUNNING, 0, 0, 0, 0, 0, 0, "anything"));
+            JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null));
 
     // First AM returns a report with jobName firstGen and simulates AM shutdown
     // on second invocation.
@@ -180,7 +180,7 @@ public class TestClientServiceDelegate {
     GetJobReportResponse jobReportResponse2 = mock(GetJobReportResponse.class);
     when(jobReportResponse2.getJobReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "jobName-secondGen", "user",
-            JobState.RUNNING, 0, 0, 0, 0, 0, 0, "anything"));
+            JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null));
 
     // Second AM generation returns a report with jobName secondGen
     MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);

+ 47 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java

@@ -20,13 +20,13 @@ package org.apache.hadoop.mapreduce.v2;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.List;
 
 import junit.framework.Assert;
 
 import org.apache.avro.AvroRemoteException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.FailingMapper;
 import org.apache.hadoop.SleepJob;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -35,8 +35,20 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.junit.Before;
 import org.junit.After;
 import org.junit.Test;
@@ -105,6 +117,8 @@ public class TestMRJobsWithHistoryService {
       return;
     }
 
+
+    
     SleepJob sleepJob = new SleepJob();
     sleepJob.setConf(mrCluster.getConfig());
     // Job with 3 maps and 2 reduces
@@ -113,7 +127,8 @@ public class TestMRJobsWithHistoryService {
     job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
     job.waitForCompletion(true);
     Counters counterMR = job.getCounters();
-    ApplicationId appID = TypeConverter.toYarn(job.getJobID()).getAppId();
+    JobId jobId = TypeConverter.toYarn(job.getJobID());
+    ApplicationId appID = jobId.getAppId();
     while (true) {
       Thread.sleep(1000);
       if (mrCluster.getResourceManager().getRMContext().getRMApps()
@@ -126,6 +141,36 @@ public class TestMRJobsWithHistoryService {
     LOG.info("CounterHS " + counterHS);
     LOG.info("CounterMR " + counterMR);
     Assert.assertEquals(counterHS, counterMR);
+    
+    MRClientProtocol historyClient = instantiateHistoryProxy();
+    GetJobReportRequest gjReq = Records.newRecord(GetJobReportRequest.class);
+    gjReq.setJobId(jobId);
+    JobReport jobReport = historyClient.getJobReport(gjReq).getJobReport();
+    verifyJobReport(jobReport, jobId);
   }
 
+  private void verifyJobReport(JobReport jobReport, JobId jobId) {
+    List<AMInfo> amInfos = jobReport.getAMInfos();
+    Assert.assertEquals(1, amInfos.size());
+    AMInfo amInfo = amInfos.get(0);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(jobId.getAppId(), 1);
+    ContainerId amContainerId = BuilderUtils.newContainerId(appAttemptId, 1);  
+    Assert.assertEquals(appAttemptId, amInfo.getAppAttemptId());
+    Assert.assertEquals(amContainerId, amInfo.getContainerId());
+    Assert.assertTrue(jobReport.getSubmitTime() > 0);
+    Assert.assertTrue(jobReport.getStartTime() > 0
+        && jobReport.getStartTime() >= jobReport.getSubmitTime());
+    Assert.assertTrue(jobReport.getFinishTime() > 0
+        && jobReport.getFinishTime() >= jobReport.getStartTime());
+  }
+  
+  private MRClientProtocol instantiateHistoryProxy() {
+    final String serviceAddr =
+        mrCluster.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS);
+    final YarnRPC rpc = YarnRPC.create(conf);
+    MRClientProtocol historyClient =
+        (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
+            NetUtils.createSocketAddr(serviceAddr), mrCluster.getConfig());
+    return historyClient;
+  }
 }

+ 22 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java

@@ -126,6 +126,14 @@ public class ConverterUtils {
     return appAttemptId;
   }
 
+  private static ApplicationId toApplicationId(
+      Iterator<String> it) throws NumberFormatException {
+    ApplicationId appId = Records.newRecord(ApplicationId.class);
+    appId.setClusterTimestamp(Long.parseLong(it.next()));
+    appId.setId(Integer.parseInt(it.next()));
+    return appId;
+  }
+
   public static String toString(ContainerId cId) {
     return cId.toString();
   }
@@ -178,4 +186,18 @@ public class ConverterUtils {
     }
   }
   
+  public static ApplicationId toApplicationId(
+      String appIdStr) {
+    Iterator<String> it = _split(appIdStr).iterator();
+    if (!it.next().equals(APPLICATION_PREFIX)) {
+      throw new IllegalArgumentException("Invalid ApplicationId prefix: "
+          + appIdStr);
+    }
+    try {
+      return toApplicationId(it);
+    } catch (NumberFormatException n) {
+      throw new IllegalArgumentException("Invalid AppAttemptId: "
+          + appIdStr, n);
+    }
+  }
 }

+ 1 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java

@@ -140,7 +140,7 @@ public class LogAggregationService extends AbstractService implements
     }
     super.stop();
   }
-
+  
   /**
    * Constructs the full filename for an application's log file per node.
    * @param remoteRootLogDir

+ 27 - 6
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java

@@ -50,6 +50,7 @@ public class LogDumper extends Configured implements Tool {
   private static final String CONTAINER_ID_OPTION = "containerId";
   private static final String APPLICATION_ID_OPTION = "applicationId";
   private static final String NODE_ADDRESS_OPTION = "nodeAddress";
+  private static final String APP_OWNER_OPTION = "appOwner";
 
   @Override
   public int run(String[] args) throws Exception {
@@ -58,6 +59,7 @@ public class LogDumper extends Configured implements Tool {
     opts.addOption(APPLICATION_ID_OPTION, true, "ApplicationId");
     opts.addOption(CONTAINER_ID_OPTION, true, "ContainerId");
     opts.addOption(NODE_ADDRESS_OPTION, true, "NodeAddress");
+    opts.addOption(APP_OWNER_OPTION, true, "AppOwner");
 
     if (args.length < 1) {
       HelpFormatter formatter = new HelpFormatter();
@@ -69,11 +71,13 @@ public class LogDumper extends Configured implements Tool {
     String appIdStr = null;
     String containerIdStr = null;
     String nodeAddress = null;
+    String appOwner = null;
     try {
       CommandLine commandLine = parser.parse(opts, args, true);
       appIdStr = commandLine.getOptionValue(APPLICATION_ID_OPTION);
       containerIdStr = commandLine.getOptionValue(CONTAINER_ID_OPTION);
       nodeAddress = commandLine.getOptionValue(NODE_ADDRESS_OPTION);
+      appOwner = commandLine.getOptionValue(APP_OWNER_OPTION);
     } catch (ParseException e) {
       System.out.println("options parsing failed: " + e.getMessage());
 
@@ -96,8 +100,11 @@ public class LogDumper extends Configured implements Tool {
 
     DataOutputStream out = new DataOutputStream(System.out);
 
+    if (appOwner == null || appOwner.isEmpty()) {
+      appOwner = UserGroupInformation.getCurrentUser().getShortUserName();
+    }
     if (containerIdStr == null && nodeAddress == null) {
-      dumpAllContainersLogs(appId, out);
+      dumpAllContainersLogs(appId, appOwner, out);
     } else if ((containerIdStr == null && nodeAddress != null)
         || (containerIdStr != null && nodeAddress == null)) {
       System.out.println("ContainerId or NodeAddress cannot be null!");
@@ -113,7 +120,7 @@ public class LogDumper extends Configured implements Tool {
               LogAggregationService.getRemoteNodeLogFileForApp(
                   remoteRootLogDir,
                   appId,
-                  UserGroupInformation.getCurrentUser().getShortUserName(),
+                  appOwner,
                   ConverterUtils.toNodeId(nodeAddress),
                   getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
                       YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX)));
@@ -123,6 +130,21 @@ public class LogDumper extends Configured implements Tool {
     return 0;
   }
 
+  public void dumpAContainersLogs(String appId, String containerId,
+      String nodeId, String jobOwner) throws IOException {
+    Path remoteRootLogDir =
+        new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+    String suffix = LogAggregationService.getRemoteNodeLogDirSuffix(getConf());
+    AggregatedLogFormat.LogReader reader =
+        new AggregatedLogFormat.LogReader(getConf(),
+            LogAggregationService.getRemoteNodeLogFileForApp(remoteRootLogDir,
+                ConverterUtils.toApplicationId(appId), jobOwner,
+                ConverterUtils.toNodeId(nodeId), suffix));
+    DataOutputStream out = new DataOutputStream(System.out);
+    dumpAContainerLogs(containerId, reader, out);
+  }
+
   private int dumpAContainerLogs(String containerIdStr,
       AggregatedLogFormat.LogReader reader, DataOutputStream out)
       throws IOException {
@@ -152,13 +174,12 @@ public class LogDumper extends Configured implements Tool {
     return 0;
   }
 
-  private void
-      dumpAllContainersLogs(ApplicationId appId, DataOutputStream out)
-          throws IOException {
+  private void dumpAllContainersLogs(ApplicationId appId, String appOwner,
+      DataOutputStream out) throws IOException {
     Path remoteRootLogDir =
         new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
             YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
-    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    String user = appOwner;
     String logDirSuffix =
         getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
             YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);

+ 8 - 0
hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -95,6 +95,7 @@ import org.apache.hadoop.mapreduce.server.jobtracker.State;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+import org.apache.hadoop.mapreduce.v2.LogParams;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
@@ -4822,6 +4823,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     return secretManager.renewToken(token, user);
   }
 
+  @Override
+  public LogParams getLogFileParams(org.apache.hadoop.mapreduce.JobID jobID,
+      org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID)
+      throws IOException, InterruptedException {
+    throw new UnsupportedOperationException("Not supported by JobTracker");
+  }
+  
   JobACLsManager getJobACLsManager() {
     return aclsManager.getJobACLsManager();
   }

+ 8 - 0
hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.LogParams;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.State;
@@ -812,4 +813,11 @@ public class LocalJobRunner implements ClientProtocol {
                                       ) throws IOException,InterruptedException{
     return 0;
   }
+
+  @Override
+  public LogParams getLogFileParams(org.apache.hadoop.mapreduce.JobID jobID,
+      org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID)
+      throws IOException, InterruptedException {
+    throw new UnsupportedOperationException("Not supported");
+  }
 }