Browse Source

Task diagnostic info made available on the AM UI. Contributed by Vinod Kumar Vavilapalli.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-279@1136306 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 14 years ago
parent
commit
010b270a9f
18 changed files with 183 additions and 81 deletions
  1. 2 0
      mapreduce/CHANGES.txt
  2. 17 5
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
  3. 0 1
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
  4. 9 2
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  5. 18 5
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
  6. 5 0
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
  7. 18 0
      mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java
  8. 17 3
      mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java
  9. 1 1
      mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto
  10. 11 4
      mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
  11. 29 16
      mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
  12. 7 10
      mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  13. 8 4
      mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  14. 21 18
      mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
  15. 6 1
      mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
  16. 13 6
      mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java
  17. 1 1
      mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
  18. 0 4
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java

+ 2 - 0
mapreduce/CHANGES.txt

@@ -5,6 +5,8 @@ Trunk (unreleased changes)
 
     MAPREDUCE-279
 
+    Task diagnostic info made available on the AM UI. (vinodkv)
+
     MAPREDUCE-2598. Fix NPE and UI for JobHistory. (Siddharth Seth via llu)
 
     Fix NPE when killing/failing already killed/failed tasks. (llu)

+ 17 - 5
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java

@@ -65,8 +65,10 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
@@ -296,8 +298,11 @@ public class MRClientService extends AbstractService
     public KillJobResponse killJob(KillJobRequest request) 
       throws YarnRemoteException {
       JobId jobId = request.getJobId();
-      LOG.info("Kill Job received from client " + jobId);
-	  verifyAndGetJob(jobId, true);
+      String message = "Kill Job received from client " + jobId;
+      LOG.info(message);
+  	  verifyAndGetJob(jobId, true);
+      appContext.getEventHandler().handle(
+          new JobDiagnosticsUpdateEvent(jobId, message));
       appContext.getEventHandler().handle(
           new JobEvent(jobId, JobEventType.JOB_KILL));
       KillJobResponse response = 
@@ -309,7 +314,8 @@ public class MRClientService extends AbstractService
     public KillTaskResponse killTask(KillTaskRequest request) 
       throws YarnRemoteException {
       TaskId taskId = request.getTaskId();
-      LOG.info("Kill task received from client " + taskId);
+      String message = "Kill task received from client " + taskId;
+      LOG.info(message);
       verifyAndGetTask(taskId, true);
       appContext.getEventHandler().handle(
           new TaskEvent(taskId, TaskEventType.T_KILL));
@@ -322,8 +328,11 @@ public class MRClientService extends AbstractService
     public KillTaskAttemptResponse killTaskAttempt(
         KillTaskAttemptRequest request) throws YarnRemoteException {
       TaskAttemptId taskAttemptId = request.getTaskAttemptId();
-      LOG.info("Kill task attempt received from client " + taskAttemptId);
+      String message = "Kill task attempt received from client " + taskAttemptId;
+      LOG.info(message);
       verifyAndGetAttempt(taskAttemptId, true);
+      appContext.getEventHandler().handle(
+          new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
       appContext.getEventHandler().handle(
           new TaskAttemptEvent(taskAttemptId, 
               TaskAttemptEventType.TA_KILL));
@@ -348,8 +357,11 @@ public class MRClientService extends AbstractService
     public FailTaskAttemptResponse failTaskAttempt(
         FailTaskAttemptRequest request) throws YarnRemoteException {
       TaskAttemptId taskAttemptId = request.getTaskAttemptId();
-      LOG.info("Fail task attempt received from client " + taskAttemptId);
+      String message = "Fail task attempt received from client " + taskAttemptId;
+      LOG.info(message);
       verifyAndGetAttempt(taskAttemptId, true);
+      appContext.getEventHandler().handle(
+          new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
       appContext.getEventHandler().handle(
           new TaskAttemptEvent(taskAttemptId, 
               TaskAttemptEventType.TA_FAILMSG));

+ 0 - 1
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -36,7 +36,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;

+ 9 - 2
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -151,6 +151,11 @@ public abstract class TaskAttemptImpl implements
 
   private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
     new CleanupContainerTransition();
+
+  private static final DiagnosticInformationUpdater 
+    DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION 
+      = new DiagnosticInformationUpdater();
+
   private static final StateMachineFactory
         <TaskAttemptImpl, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
         stateMachineFactory
@@ -198,7 +203,7 @@ public abstract class TaskAttemptImpl implements
          TaskAttemptEventType.TA_UPDATE, new StatusUpdater())
      .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
-         new DiagnosticInformationUpdater())
+         DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
      // If no commit is required, task directly goes to success
      .addTransition(TaskAttemptState.RUNNING,
          TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
@@ -232,7 +237,7 @@ public abstract class TaskAttemptImpl implements
      .addTransition(TaskAttemptState.COMMIT_PENDING,
          TaskAttemptState.COMMIT_PENDING,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
-         new DiagnosticInformationUpdater())
+         DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
      .addTransition(TaskAttemptState.COMMIT_PENDING,
          TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
@@ -260,6 +265,7 @@ public abstract class TaskAttemptImpl implements
          TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
              TaskAttemptEventType.TA_TIMED_OUT,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED))
 
@@ -339,6 +345,7 @@ public abstract class TaskAttemptImpl implements
          TaskAttemptState.SUCCEEDED,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED))
 
      // Ignore-able events for FAILED state

+ 18 - 5
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java

@@ -30,8 +30,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.app.AMConstants;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
@@ -40,6 +42,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.ContainerManager;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -47,6 +50,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
@@ -66,6 +70,7 @@ public class ContainerLauncherImpl extends AbstractService implements
   private Thread eventHandlingThread;
   private BlockingQueue<ContainerLauncherEvent> eventQueue =
       new LinkedBlockingQueue<ContainerLauncherEvent>();
+  private RecordFactory recordFactory;
 
   public ContainerLauncherImpl(AppContext context) {
     super(ContainerLauncherImpl.class.getName());
@@ -80,6 +85,7 @@ public class ContainerLauncherImpl extends AbstractService implements
     myLocalConfig.setClass(
         CommonConfigurationKeysPublic.HADOOP_SECURITY_INFO_CLASS_NAME,
         ContainerManagerSecurityInfo.class, SecurityInfo.class);
+    this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
     super.init(myLocalConfig);
   }
 
@@ -172,6 +178,7 @@ public class ContainerLauncherImpl extends AbstractService implements
       case CONTAINER_REMOTE_LAUNCH:
         ContainerRemoteLaunchEvent launchEv = (ContainerRemoteLaunchEvent) event;
 
+        TaskAttemptId taskAttemptID = launchEv.getTaskAttemptID();
         try {
           
           ContainerManager proxy = 
@@ -184,19 +191,24 @@ public class ContainerLauncherImpl extends AbstractService implements
           // TODO: Make sure that child's mapred-local-dir is set correctly.
 
           // Now launch the actual container
-          StartContainerRequest startRequest = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(StartContainerRequest.class);
+          StartContainerRequest startRequest = recordFactory
+              .newRecordInstance(StartContainerRequest.class);
           startRequest.setContainerLaunchContext(containerLaunchContext);
           proxy.startContainer(startRequest);
 
           // after launching, send launched event to task attempt to move
           // it from ASSIGNED to RUNNING state
           context.getEventHandler().handle(
-              new TaskAttemptEvent(launchEv.getTaskAttemptID(),
+              new TaskAttemptEvent(taskAttemptID,
                   TaskAttemptEventType.TA_CONTAINER_LAUNCHED));
         } catch (Exception e) {
-          LOG.error("Container launch failed", e);
+          String message = "Container launch failed for " + containerID
+              + " : " + StringUtils.stringifyException(e);
+          LOG.error(message);
           context.getEventHandler().handle(
-              new TaskAttemptEvent(launchEv.getTaskAttemptID(),
+              new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message));
+          context.getEventHandler().handle(
+              new TaskAttemptEvent(taskAttemptID,
                   TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
         }
 
@@ -218,7 +230,8 @@ public class ContainerLauncherImpl extends AbstractService implements
             // TODO:check whether container is launched
 
             // kill the remote container if already launched
-            StopContainerRequest stopRequest = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(StopContainerRequest.class);
+            StopContainerRequest stopRequest = recordFactory
+                .newRecordInstance(StopContainerRequest.class);
             stopRequest.setContainerId(event.getContainerID());
             proxy.stopContainer(stopRequest);
 

+ 5 - 0
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -412,6 +413,10 @@ public class RMContainerAllocator extends RMContainerRequestor
           //send the container completed event to Task attempt
           eventHandler.handle(new TaskAttemptEvent(attemptID,
               TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+          // Send the diagnostics
+          String diagnostics = cont.getContainerStatus().getDiagnostics();
+          eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(
+              attemptID, diagnostics));
         }
       }
       LOG.debug("Received Container :" + cont);

+ 18 - 0
mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java

@@ -1,3 +1,21 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 package org.apache.hadoop.yarn.api.records;
 
 import java.util.List;

+ 17 - 3
mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java

@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.api.records.impl.pb;
 
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -38,6 +40,7 @@ import org.apache.hadoop.yarn.util.ProtoUtils;
 
     
 public class ContainerPBImpl extends ProtoBase<ContainerProto> implements Container {
+
   ContainerProto proto = ContainerProto.getDefaultInstance();
   ContainerProto.Builder builder = null;
   boolean viaProto = false;
@@ -65,15 +68,26 @@ public class ContainerPBImpl extends ProtoBase<ContainerProto> implements Contai
   }
 
   private void mergeLocalToBuilder() {
-    if (this.containerId != null &&  !((ContainerIdPBImpl)containerId).getProto().equals(builder.getId())) {
+    if (this.containerId != null
+        && !((ContainerIdPBImpl) containerId).getProto().equals(
+            builder.getId())) {
       builder.setId(convertToProtoFormat(this.containerId));
     }
-    if (this.resource != null && !((ResourcePBImpl)this.resource).getProto().equals(builder.getResource())) {
+    if (this.resource != null
+        && !((ResourcePBImpl) this.resource).getProto().equals(
+            builder.getResource())) {
       builder.setResource(convertToProtoFormat(this.resource));
     }
-    if (this.containerToken != null && !((ContainerTokenPBImpl)this.containerToken).getProto().equals(builder.getContainerToken())) {
+    if (this.containerToken != null
+        && !((ContainerTokenPBImpl) this.containerToken).getProto().equals(
+            builder.getContainerToken())) {
       builder.setContainerToken(convertToProtoFormat(this.containerToken));
     }
+    if (this.containerStatus != null
+        && !((ContainerStatusPBImpl) this.containerStatus).getProto().equals(
+            builder.getContainerStatus())) {
+      builder.setContainerStatus(convertToProtoFormat(this.containerStatus));
+    }
   }
 
   private void mergeLocalToProto() {

+ 1 - 1
mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto

@@ -219,7 +219,7 @@ message ContainerLaunchContextProto {
 message ContainerStatusProto {
   optional ContainerIdProto container_id = 1;
   optional ContainerStateProto state = 2;
-  optional string diagnostics = 3 [default = ""];
+  optional string diagnostics = 3 [default = "N/A"];
   optional string exit_status = 4 [default = "N/A"];
 }
 

+ 11 - 4
mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java

@@ -32,8 +32,10 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -89,8 +91,10 @@ public class DefaultContainerExecutor extends ContainerExecutor {
       String userName, String appId, Path containerWorkDir)
       throws IOException {
 
+    ContainerId containerId = container.getContainerID();
+
     // create container dirs on all disks
-    String containerIdStr = ConverterUtils.toString(container.getContainerID());
+    String containerIdStr = ConverterUtils.toString(containerId);
     String appIdStr =
         ConverterUtils.toString(container.getContainerID().getAppId());
     String[] sLocalDirs =
@@ -128,7 +132,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
       LOG.info("launchContainer: " + Arrays.toString(command));
       shExec = new ShellCommandExecutor(command,
           new File(containerWorkDir.toUri().getPath()));
-      launchCommandObjs.put(container.getLaunchContext().getContainerId(), shExec);
+      launchCommandObjs.put(containerId, shExec);
       shExec.execute();
     } catch (Exception e) {
       if (null == shExec) {
@@ -136,10 +140,13 @@ public class DefaultContainerExecutor extends ContainerExecutor {
       }
       int exitCode = shExec.getExitCode();
       LOG.warn("Exit code from task is : " + exitCode);
-      logOutput(shExec.getOutput());
+      String message = shExec.getOutput();
+      logOutput(message);
+      container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
+          message));
       return exitCode;
     } finally {
-      launchCommandObjs.remove(container.getLaunchContext().getContainerId());
+      launchCommandObjs.remove(containerId);
     }
     return 0;
   }

+ 29 - 16
mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java

@@ -29,9 +29,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -71,8 +74,8 @@ public class LinuxContainerExecutor extends ContainerExecutor {
   }
 
   /**
-   * Result codes returned from the C task-controller.
-   * These must match the values in task-controller.h.
+   * Result codes returned from the C container-executor.
+   * These must match the values in container-executor.h.
    */
   enum ResultCode {
     OK(0),
@@ -137,7 +140,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
       }
     } catch (ExitCodeException e) {
       int exitCode = shExec.getExitCode();
-      LOG.warn("Exit code from task is : " + exitCode);
+      LOG.warn("Exit code from container is : " + exitCode);
       logOutput(shExec.getOutput());
       throw new IOException("App initialization failed (" + exitCode + ")", e);
     }
@@ -147,7 +150,9 @@ public class LinuxContainerExecutor extends ContainerExecutor {
   public int launchContainer(Container container,
       Path nmPrivateCotainerScriptPath, Path nmPrivateTokensPath,
       String user, String appId, Path containerWorkDir) throws IOException {
-    String containerIdStr = ConverterUtils.toString(container.getContainerID());
+
+    ContainerId containerId = container.getContainerID();
+    String containerIdStr = ConverterUtils.toString(containerId);
     List<String> command = new ArrayList<String>(
       Arrays.asList(containerExecutorExe, 
                     user, 
@@ -159,34 +164,42 @@ public class LinuxContainerExecutor extends ContainerExecutor {
                     nmPrivateTokensPath.toUri().getPath().toString()));
     String[] commandArray = command.toArray(new String[command.size()]);
     ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
-    launchCommandObjs.put(container.getLaunchContext().getContainerId(), shExec);
+    launchCommandObjs.put(containerId, shExec);
     // DEBUG
     LOG.info("launchContainer: " + Arrays.toString(commandArray));
     if (LOG.isDebugEnabled()) {
       LOG.debug("launchContainer: " + Arrays.toString(commandArray));
     }
+    String output = shExec.getOutput();
     try {
       shExec.execute();
       if (LOG.isDebugEnabled()) {
-        logOutput(shExec.getOutput());
+        logOutput(output);
       }
     } catch (ExitCodeException e) {
       int exitCode = shExec.getExitCode();
-      LOG.warn("Exit code from task is : " + exitCode);
-      // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the task was
+      LOG.warn("Exit code from container is : " + exitCode);
+      // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
       // terminated/killed forcefully. In all other cases, log the
-      // task-controller output
+      // container-executor's output
       if (exitCode != 143 && exitCode != 137) {
-        LOG.warn("Exception thrown while launching task JVM : ", e);
-        logOutput(shExec.getOutput());
+        LOG.warn("Exception from container-launch : ", e);
+        logOutput(output);
+        String diagnostics = "Exception from container-launch: \n"
+            + StringUtils.stringifyException(e) + "\n" + output;
+        container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
+            diagnostics));
+      } else {
+        container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
+            "Container killed on request. Exit code is " + exitCode));
       }
       return exitCode;
     } finally {
-      launchCommandObjs.remove(container.getLaunchContext().getContainerId());
+      launchCommandObjs.remove(containerId);
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Output from LinuxContainerExecutor's launchTask follows:");
-      logOutput(shExec.getOutput());
+      LOG.debug("Output from LinuxContainerExecutor's launchContainer follows:");
+      logOutput(output);
     }
     return 0;
   }
@@ -203,7 +216,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
                    Integer.toString(signal.getValue()) };
     ShellCommandExecutor shExec = new ShellCommandExecutor(command);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("signalTask: " + Arrays.toString(command));
+      LOG.debug("signalContainer: " + Arrays.toString(command));
     }
     try {
       shExec.execute();
@@ -248,7 +261,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
       }
     } catch (IOException e) {
       int exitCode = shExec.getExitCode();
-      LOG.warn("Exit code from task is : " + exitCode);
+      LOG.warn("Exit code from container is : " + exitCode);
       if (exitCode != 0) {
         LOG.error("DeleteAsUser for " + dir.toUri().getPath()
             + " returned with non-zero exit code" + exitCode);

+ 7 - 10
mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -189,11 +189,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   }
 
   private NodeStatus getNodeStatus() {
-    NodeStatus status = recordFactory.newRecordInstance(NodeStatus.class);
-    status.setNodeId(this.nodeId);
 
-    Map<String, List<org.apache.hadoop.yarn.api.records.Container>> activeContainers =
-        status.getAllContainers();
+    NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
+    nodeStatus.setNodeId(this.nodeId);
 
     int numActiveContainers = 0;
     synchronized (this.context.getContainers()) {
@@ -204,12 +202,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         Container container = e.getValue();
         String applicationId = String.valueOf(containerId.getAppId().getId()); // TODO: ID? Really?
 
-        List<org.apache.hadoop.yarn.api.records.Container> applicationContainers = status.getContainers(applicationId);
-            activeContainers.get(applicationId);
+        List<org.apache.hadoop.yarn.api.records.Container> applicationContainers = nodeStatus
+            .getContainers(applicationId);
         if (applicationContainers == null) {
           applicationContainers = new ArrayList<org.apache.hadoop.yarn.api.records.Container>();
-          status.setContainers(applicationId, applicationContainers);
-//          activeContainers.put(applicationId, applicationContainers);
+          nodeStatus.setContainers(applicationId, applicationContainers);
         }
 
         // Clone the container to send it to the RM
@@ -238,9 +235,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     }
     LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
         + ", " + nodeHealthStatus.getHealthReport());
-    status.setNodeHealthStatus(nodeHealthStatus);
+    nodeStatus.setNodeHealthStatus(nodeHealthStatus);
 
-    return status;
+    return nodeStatus;
   }
 
   @Override

+ 8 - 4
mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -145,7 +145,7 @@ public class ContainerManagerImpl extends CompositeService implements
     addService(auxiluaryServices);
 
     this.containersMonitor =
-        new ContainersMonitorImpl(exec, dispatcher);
+        new ContainersMonitorImpl(exec, dispatcher, this.context);
     addService(this.containersMonitor);
 
     LogAggregationService logAggregationService =
@@ -228,15 +228,18 @@ public class ContainerManagerImpl extends CompositeService implements
     }
     super.stop();
   }
-  
+
+  /**
+   * Start a container on this NodeManager.
+   */
   @Override
   public StartContainerResponse startContainer(StartContainerRequest request)
       throws YarnRemoteException {
     ContainerLaunchContext launchContext = request.getContainerLaunchContext();
 
     LOG.info(" container is " + request);
-
-    // parse credentials
+  
+    // //////////// Parse credentials
     ByteBuffer tokens = launchContext.getContainerTokens();
     Credentials credentials = new Credentials();
     if (tokens != null) {
@@ -255,6 +258,7 @@ public class ContainerManagerImpl extends CompositeService implements
         throw RPCUtil.getRemoteException(e);
       }
     }
+    // //////////// End of parsing credentials
 
     Container container =
         new ContainerImpl(this.dispatcher, launchContext, credentials, metrics);

+ 21 - 18
mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java

@@ -41,7 +41,6 @@ import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -87,6 +86,7 @@ public class ContainerLaunch implements Callable<Integer> {
   public Integer call() {
     final ContainerLaunchContext launchContext = container.getLaunchContext();
     final Map<Path,String> localResources = container.getLocalizedResources();
+    String containerIdStr = ConverterUtils.toString(container.getContainerID());
     final String user = launchContext.getUser();
     final Map<String,String> env = launchContext.getAllEnv();
     final List<String> command = launchContext.getCommandList();
@@ -96,8 +96,6 @@ public class ContainerLaunch implements Callable<Integer> {
       // /////////////////////////// Variable expansion
       // Before the container script gets written out.
       List<String> newCmds = new ArrayList<String>(command.size());
-      String containerIdStr =
-          ConverterUtils.toString(container.getContainerID());
       String appIdStr = app.toString();
       Path containerLogDir =
           this.logDirsSelector.getLocalPathForWrite(appIdStr + Path.SEPARATOR
@@ -172,7 +170,7 @@ public class ContainerLaunch implements Callable<Integer> {
             containerWorkDir, FINAL_CONTAINER_TOKENS_FILE).toUri().getPath());
 
         writeLaunchEnv(containerScriptOutStream, env, localResources,
-            command, appDirs);
+            launchContext.getCommandList(), appDirs);
         // /////////// End of writing out container-script
 
         // /////////// Write out the container-tokens in the nmPrivate space.
@@ -194,18 +192,6 @@ public class ContainerLaunch implements Callable<Integer> {
       ret =
           exec.launchContainer(container, nmPrivateContainerScriptPath,
               nmPrivateTokensPath, user, appIdStr, containerWorkDir);
-      if (ret == ExitCode.KILLED.getExitCode()) {
-        // If the process was killed, Send container_cleanedup_after_kill and
-        // just break out of this method.
-        dispatcher.getEventHandler().handle(
-            new ContainerExitEvent(launchContext.getContainerId(),
-                ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ret));
-        return ret;
-      }
-
-      if (ret != 0) {
-        throw new ExitCodeException(ret, "Container failed");
-      }
     } catch (Throwable e) {
       LOG.warn("Failed to launch container", e);
       dispatcher.getEventHandler().handle(new ContainerExitEvent(
@@ -213,8 +199,25 @@ public class ContainerLaunch implements Callable<Integer> {
             ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret));
       return ret;
     }
-    LOG.info("Container " + container + " succeeded "
-        + launchContext.getContainerId());
+
+    if (ret == ExitCode.KILLED.getExitCode()) {
+      // If the process was killed, Send container_cleanedup_after_kill and
+      // just break out of this method.
+      dispatcher.getEventHandler().handle(
+            new ContainerExitEvent(launchContext.getContainerId(),
+                ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ret));
+      return ret;
+    }
+
+    if (ret != 0) {
+      LOG.warn("Container exited with a non-zero exit code " + ret);
+      this.dispatcher.getEventHandler().handle(new ContainerExitEvent(
+              launchContext.getContainerId(),
+              ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret));
+      return ret;
+    }
+
+    LOG.info("Container " + containerIdStr + " succeeded ");
     dispatcher.getEventHandler().handle(
         new ContainerEvent(launchContext.getContainerId(),
             ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));

+ 6 - 1
mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java

@@ -14,10 +14,13 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
@@ -46,6 +49,7 @@ public class ContainersMonitorImpl extends AbstractService implements
 
   final ContainerExecutor containerExecutor;
   private final Dispatcher eventDispatcher;
+  private final Context context;
   private ResourceCalculatorPlugin resourceCalculatorPlugin;
 
   private long maxVmemAllottedForContainers = DISABLED_MEMORY_LIMIT;
@@ -63,11 +67,12 @@ public class ContainersMonitorImpl extends AbstractService implements
           "limit : %d bytes; Physical %d bytes, limit %d bytes";
 
   public ContainersMonitorImpl(ContainerExecutor exec,
-      AsyncDispatcher dispatcher) {
+      AsyncDispatcher dispatcher, Context context) {
     super("containers-monitor");
 
     this.containerExecutor = exec;
     this.eventDispatcher = dispatcher;
+    this.context = context;
 
     this.containersToBeAdded = new HashMap<ContainerId, ProcessTreeInfo>();
     this.containersToBeRemoved = new ArrayList<ContainerId>();

+ 13 - 6
mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java

@@ -24,6 +24,7 @@ import static org.apache.hadoop.yarn.util.StringHelper.ujoin;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -66,12 +67,18 @@ public class ContainerPage extends NMView implements NMWebParams {
     protected void render(Block html) {
       ContainerId containerID =
         ConverterUtils.toContainerId(this.recordFactory, $(CONTAINER_ID));
-    Container container = this.nmContext.getContainers().get(containerID);
-    info("Container information")
-      ._("ContainerID", $(CONTAINER_ID))
-      ._("ContainerState", container.getContainerState())
-      ._("logs", ujoin("containerlogs", $(CONTAINER_ID)), "Link to logs");
-    html._(InfoBlock.class);
+      Container container = this.nmContext.getContainers().get(containerID);
+      ContainerStatus containerData = container.cloneAndGetContainerStatus();
+      info("Container information")
+        ._("ContainerID", $(CONTAINER_ID))
+        ._("ContainerState", container.getContainerState())
+        ._("ExitStatus", containerData.getExitStatus())
+        ._("Diagnostics", containerData.getDiagnostics())
+        ._("User", container.getUser())
+        ._("TotalMemoryNeeded",
+            container.getLaunchContext().getResource().getMemory())
+        ._("logs", ujoin("containerlogs", $(CONTAINER_ID)), "Link to logs");
+      html._(InfoBlock.class);
     }
   }
 }

+ 1 - 1
mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java

@@ -120,7 +120,7 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
       // vmem limit
       long limit = 700000;
 
-      ContainersMonitorImpl test = new ContainersMonitorImpl(null, null);
+      ContainersMonitorImpl test = new ContainersMonitorImpl(null, null, null);
 
       // create process trees
       // tree rooted at 100 is over limit immediately, as it is

+ 0 - 4
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java

@@ -284,10 +284,6 @@ public class Application {
     Resources.subtractFrom(currentConsumption, containerResource);
   }
 
-  synchronized public void completedContainers(List<Container> containers) {
-    completedContainers.addAll(containers);
-  }
-
   /**
    * Resources have been allocated to this application by the resource
    * scheduler. Track them.