Explorar o código

MAPREDUCE-4951. Container preemption interpreted as task failure. Contributed by Sandy Ryza.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1448615 13f79535-47bb-0310-9956-ffa450edef68
Thomas White %!s(int64=12) %!d(string=hai) anos
pai
achega
0b73dde6ce

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -183,6 +183,9 @@ Release 2.0.4-beta - UNRELEASED
     MAPREDUCE-5013. mapred.JobStatus compatibility: MR2 missing constructors
     from MR1. (Sandy Ryza via tomwhite)
 
+    MAPREDUCE-4951. Container preemption interpreted as task failure.
+    (Sandy Ryza via tomwhite)
+
 Release 2.0.3-alpha - 2013-02-06 
 
   INCOMPATIBLE CHANGES

+ 0 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -238,7 +238,6 @@ public abstract class TaskAttemptImpl implements
          TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_CONTAINER_COMPLETED,
          CLEANUP_CONTAINER_TRANSITION)
-      // ^ If RM kills the container due to expiry, preemption etc. 
      .addTransition(TaskAttemptStateInternal.ASSIGNED, 
          TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION)

+ 18 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -67,9 +67,12 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.RackResolver;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Allocates the container from the ResourceManager scheduler.
  */
@@ -606,8 +609,8 @@ public class RMContainerAllocator extends RMContainerRequestor
         assignedRequests.remove(attemptID);
         
         // send the container completed event to Task attempt
-        eventHandler.handle(new TaskAttemptEvent(attemptID,
-            TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+        eventHandler.handle(createContainerFinishedEvent(cont, attemptID));
+        
         // Send the diagnostics
         String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
         eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
@@ -617,6 +620,19 @@ public class RMContainerAllocator extends RMContainerRequestor
     return newContainers;
   }
   
+  @VisibleForTesting
+  public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont,
+      TaskAttemptId attemptID) {
+    if (cont.getExitStatus() == YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS) {
+      // killed by framework
+      return new TaskAttemptEvent(attemptID,
+          TaskAttemptEventType.TA_KILL);
+    } else {
+      return new TaskAttemptEvent(attemptID,
+          TaskAttemptEventType.TA_CONTAINER_COMPLETED);
+    }
+  }
+  
   @SuppressWarnings("unchecked")
   private void handleUpdatedNodes(AMResponse response) {
     // send event to the job about on updated nodes

+ 27 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

@@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
@@ -1645,6 +1646,32 @@ public class TestRMContainerAllocator {
     Assert.assertTrue(callbackCalled.get());
   }
 
+  @Test
+  public void testCompletedContainerEvent() {
+    RMContainerAllocator allocator = new RMContainerAllocator(
+        mock(ClientService.class), mock(AppContext.class));
+    
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
+        MRBuilderUtils.newTaskId(
+            MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1);
+    ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
+    ContainerStatus status = BuilderUtils.newContainerStatus(
+        containerId, ContainerState.RUNNING, "", 0);
+
+    ContainerStatus abortedStatus = BuilderUtils.newContainerStatus(
+        containerId, ContainerState.RUNNING, "",
+        YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS);
+    
+    TaskAttemptEvent event = allocator.createContainerFinishedEvent(status,
+        attemptId);
+    Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+        event.getType());
+    
+    TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent(
+        abortedStatus, attemptId);
+    Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
+  }
+  
   public static void main(String[] args) throws Exception {
     TestRMContainerAllocator t = new TestRMContainerAllocator();
     t.testSimple();