Bladeren bron

svn merge -c 1381806,1381807,1381809,1381811 FIXES: YARN-75. Modified ResourceManager's RMContainer to handle a valid RELEASE event at RUNNING state. Contributed by Siddharth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1390695 13f79535-47bb-0310-9956-ffa450edef68
Robert Joseph Evans 12 jaren geleden
bovenliggende
commit
530c54d58e

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -21,6 +21,9 @@ Release 0.23.4 - UNRELEASED
     YARN-42. Modify NM's non-aggregating logs' handler to stop properly so that
     YARN-42. Modify NM's non-aggregating logs' handler to stop properly so that
     NMs don't get NPEs on startup errors. (Devaraj K via vinodkv)
     NMs don't get NPEs on startup errors. (Devaraj K via vinodkv)
 
 
+    YARN-75. Modified ResourceManager's RMContainer to handle a valid RELEASE
+    event at RUNNING state. (Siddharth Seth via vinodkv)
+
 Release 0.23.3 - Unreleased
 Release 0.23.3 - Unreleased
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 4 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java

@@ -152,9 +152,9 @@ public interface AllocateRequest {
   void clearAsks();
   void clearAsks();
 
 
   /**
   /**
-   * Get the list of <code>ContainerId</code> of unused containers being 
+   * Get the list of <code>ContainerId</code> of containers being 
    * released by the <code>ApplicationMaster</code>.
    * released by the <code>ApplicationMaster</code>.
-   * @return list of <code>ContainerId</code> of unused containers being 
+   * @return list of <code>ContainerId</code> of containers being 
    *         released by the <code>ApplicationMaster</code> 
    *         released by the <code>ApplicationMaster</code> 
    */
    */
   @Public
   @Public
@@ -170,9 +170,9 @@ public interface AllocateRequest {
   int getReleaseCount();
   int getReleaseCount();
 
 
   /**
   /**
-   * Add the list of <code>ContainerId</code> of unused containers being 
+   * Add the list of <code>ContainerId</code> of containers being 
    * released by the <code>ApplicationMaster</code>
    * released by the <code>ApplicationMaster</code>
-   * @param releaseContainers list of <code>ContainerId</code> of unused 
+   * @param releaseContainers list of <code>ContainerId</code> of 
    *                          containers being released by the <
    *                          containers being released by the <
    *                          code>ApplicationMaster</code>
    *                          code>ApplicationMaster</code>
    */
    */

+ 7 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 
 
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class RMContainerImpl implements RMContainer {
 public class RMContainerImpl implements RMContainer {
 
 
   private static final Log LOG = LogFactory.getLog(RMContainerImpl.class);
   private static final Log LOG = LogFactory.getLog(RMContainerImpl.class);
@@ -95,6 +96,8 @@ public class RMContainerImpl implements RMContainer {
         RMContainerEventType.FINISHED, new FinishedTransition())
         RMContainerEventType.FINISHED, new FinishedTransition())
     .addTransition(RMContainerState.RUNNING, RMContainerState.KILLED,
     .addTransition(RMContainerState.RUNNING, RMContainerState.KILLED,
         RMContainerEventType.KILL, new KillTransition())
         RMContainerEventType.KILL, new KillTransition())
+    .addTransition(RMContainerState.RUNNING, RMContainerState.RELEASED,
+        RMContainerEventType.RELEASED, new KillTransition())
 
 
     // Transitions from COMPLETED state
     // Transitions from COMPLETED state
     .addTransition(RMContainerState.COMPLETED, RMContainerState.COMPLETED,
     .addTransition(RMContainerState.COMPLETED, RMContainerState.COMPLETED,
@@ -106,11 +109,13 @@ public class RMContainerImpl implements RMContainer {
 
 
     // Transitions from RELEASED state
     // Transitions from RELEASED state
     .addTransition(RMContainerState.RELEASED, RMContainerState.RELEASED,
     .addTransition(RMContainerState.RELEASED, RMContainerState.RELEASED,
-        EnumSet.of(RMContainerEventType.RELEASED, RMContainerEventType.KILL))
+        EnumSet.of(RMContainerEventType.RELEASED, RMContainerEventType.KILL,
+            RMContainerEventType.FINISHED))
 
 
     // Transitions from KILLED state
     // Transitions from KILLED state
     .addTransition(RMContainerState.KILLED, RMContainerState.KILLED,
     .addTransition(RMContainerState.KILLED, RMContainerState.KILLED,
-        EnumSet.of(RMContainerEventType.RELEASED, RMContainerEventType.KILL))
+        EnumSet.of(RMContainerEventType.RELEASED, RMContainerEventType.KILL,
+            RMContainerEventType.FINISHED))
 
 
     // create the topology tables
     // create the topology tables
     .installTopology(); 
     .installTopology(); 

+ 119 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java

@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class TestRMContainerImpl {
+
+  @Test
+  public void testReleaseWhileRunning() {
+
+    DrainDispatcher drainDispatcher = new DrainDispatcher();
+    EventHandler eventHandler = drainDispatcher.getEventHandler();
+    EventHandler<RMAppAttemptEvent> appAttemptEventHandler = mock(EventHandler.class);
+    EventHandler generic = mock(EventHandler.class);
+    drainDispatcher.register(RMAppAttemptEventType.class,
+        appAttemptEventHandler);
+    drainDispatcher.register(RMNodeEventType.class, generic);
+    drainDispatcher.init(new YarnConfiguration());
+    drainDispatcher.start();
+    NodeId nodeId = BuilderUtils.newNodeId("host", 3425);
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 1);
+    ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
+    ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class);
+
+    Resource resource = BuilderUtils.newResource(512);
+    Priority priority = BuilderUtils.newPriority(5);
+
+    Container container = BuilderUtils.newContainer(containerId, nodeId,
+        "host:3465", resource, priority, null);
+
+    RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
+        nodeId, eventHandler, expirer);
+
+    assertEquals(RMContainerState.NEW, rmContainer.getState());
+
+    rmContainer.handle(new RMContainerEvent(containerId,
+        RMContainerEventType.START));
+    drainDispatcher.await();
+    assertEquals(RMContainerState.ALLOCATED, rmContainer.getState());
+
+    rmContainer.handle(new RMContainerEvent(containerId,
+        RMContainerEventType.ACQUIRED));
+    drainDispatcher.await();
+    assertEquals(RMContainerState.ACQUIRED, rmContainer.getState());
+
+    rmContainer.handle(new RMContainerEvent(containerId,
+        RMContainerEventType.LAUNCHED));
+    drainDispatcher.await();
+    assertEquals(RMContainerState.RUNNING, rmContainer.getState());
+
+    // In RUNNING state. Verify RELEASED and associated actions.
+    reset(appAttemptEventHandler);
+    ContainerStatus containerStatus = SchedulerUtils
+        .createAbnormalContainerStatus(containerId,
+            SchedulerUtils.RELEASED_CONTAINER);
+    rmContainer.handle(new RMContainerFinishedEvent(containerId,
+        containerStatus, RMContainerEventType.RELEASED));
+    drainDispatcher.await();
+    assertEquals(RMContainerState.RELEASED, rmContainer.getState());
+
+    ArgumentCaptor<RMAppAttemptContainerFinishedEvent> captor = ArgumentCaptor
+        .forClass(RMAppAttemptContainerFinishedEvent.class);
+    verify(appAttemptEventHandler).handle(captor.capture());
+    RMAppAttemptContainerFinishedEvent cfEvent = captor.getValue();
+    assertEquals(appAttemptId, cfEvent.getApplicationAttemptId());
+    assertEquals(containerStatus, cfEvent.getContainerStatus());
+    assertEquals(RMAppAttemptEventType.CONTAINER_FINISHED, cfEvent.getType());
+    
+    // In RELEASED state. A FINIHSED event may come in.
+    rmContainer.handle(new RMContainerFinishedEvent(containerId, SchedulerUtils
+        .createAbnormalContainerStatus(containerId, "FinishedContainer"),
+        RMContainerEventType.FINISHED));
+    assertEquals(RMContainerState.RELEASED, rmContainer.getState());
+  }
+
+}

+ 0 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java.orig