瀏覽代碼

YARN-1343. NodeManagers additions/restarts are not reported as node updates in AllocateResponse responses to AMs. (tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.2@1537370 13f79535-47bb-0310-9956-ffa450edef68
Alejandro Abdelnur 11 年之前
父節點
當前提交
ec2968ffc5

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -71,6 +71,9 @@ Release 2.2.1 - UNRELEASED
     YARN-1358. TestYarnCLI fails on Windows due to line endings. (Chuan Liu via
     cnauroth)
 
+    YARN-1343. NodeManagers additions/restarts are not reported as node updates 
+    in AllocateResponse responses to AMs. (tucu)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES

+ 8 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java

@@ -160,17 +160,14 @@ public class NodesListManager extends AbstractService implements
       if (unusableRMNodesConcurrentSet.contains(eventNode)) {
         LOG.debug(eventNode + " reported usable");
         unusableRMNodesConcurrentSet.remove(eventNode);
-        for (RMApp app : rmContext.getRMApps().values()) {
-          this.rmContext
-              .getDispatcher()
-              .getEventHandler()
-              .handle(
-                  new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
-                      RMAppNodeUpdateType.NODE_USABLE));
-        }
-      } else {
-        LOG.warn(eventNode
-            + " reported usable without first reporting unusable");
+      }
+      for (RMApp app : rmContext.getRMApps().values()) {
+        this.rmContext
+            .getDispatcher()
+            .getEventHandler()
+            .handle(
+                new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
+                    RMAppNodeUpdateType.NODE_USABLE));
       }
       break;
     default:

+ 8 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -431,7 +431,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
       rmNode.context.getDispatcher().getEventHandler().handle(
           new NodeAddedSchedulerEvent(rmNode));
-      
+      rmNode.context.getDispatcher().getEventHandler().handle(
+          new NodesListManagerEvent(
+              NodesListManagerEventType.NODE_USABLE, rmNode));
+ 
       String host = rmNode.nodeId.getHost();
       if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
         // Old node rejoining
@@ -464,7 +467,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
           // Only add new node if old state is not UNHEALTHY
           rmNode.context.getDispatcher().getEventHandler().handle(
               new NodeAddedSchedulerEvent(rmNode));
-         }
+        }
       } else {
         // Reconnected node differs, so replace old node and start new node
         switch (rmNode.getState()) {
@@ -479,6 +482,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         rmNode.context.getDispatcher().getEventHandler().handle(
             new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED));
       }
+      rmNode.context.getDispatcher().getEventHandler().handle(
+          new NodesListManagerEvent(
+              NodesListManagerEventType.NODE_USABLE, rmNode));
     }
   }
 

+ 76 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainer
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -79,6 +81,18 @@ public class TestRMNodeTransitions {
     }
   }
 
+  private NodesListManagerEvent nodesListManagerEvent = null;
+  
+  private class TestNodeListManagerEventDispatcher implements
+      EventHandler<NodesListManagerEvent> {
+    
+    @Override
+    public void handle(NodesListManagerEvent event) {
+      nodesListManagerEvent = event;
+    }
+
+  }
+
   @Before
   public void setUp() throws Exception {
     InlineDispatcher rmDispatcher = new InlineDispatcher();
@@ -109,8 +123,12 @@ public class TestRMNodeTransitions {
     rmDispatcher.register(SchedulerEventType.class, 
         new TestSchedulerEventDispatcher());
     
+    rmDispatcher.register(NodesListManagerEventType.class,
+        new TestNodeListManagerEventDispatcher());
+
     NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
     node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null);
+    nodesListManagerEvent =  null;
 
   }
   
@@ -389,8 +407,9 @@ public class TestRMNodeTransitions {
 
   private RMNodeImpl getRunningNode() {
     NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
+    Resource capability = Resource.newInstance(4096, 4);
     RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
-        null, null);
+        null, capability);
     node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
     Assert.assertEquals(NodeState.RUNNING, node.getState());
     return node;
@@ -405,4 +424,60 @@ public class TestRMNodeTransitions {
     Assert.assertEquals(NodeState.UNHEALTHY, node.getState());
     return node;
   }
+
+
+  private RMNodeImpl getNewNode() {
+    NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
+    RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null);
+    return node;
+  }
+
+  @Test
+  public void testAdd() {
+    RMNodeImpl node = getNewNode();
+    ClusterMetrics cm = ClusterMetrics.getMetrics();
+    int initialActive = cm.getNumActiveNMs();
+    int initialLost = cm.getNumLostNMs();
+    int initialUnhealthy = cm.getUnhealthyNMs();
+    int initialDecommissioned = cm.getNumDecommisionedNMs();
+    int initialRebooted = cm.getNumRebootedNMs();
+    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
+    Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
+    Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
+    Assert.assertEquals("Unhealthy Nodes",
+        initialUnhealthy, cm.getUnhealthyNMs());
+    Assert.assertEquals("Decommissioned Nodes",
+        initialDecommissioned, cm.getNumDecommisionedNMs());
+    Assert.assertEquals("Rebooted Nodes",
+        initialRebooted, cm.getNumRebootedNMs());
+    Assert.assertEquals(NodeState.RUNNING, node.getState());
+    Assert.assertNotNull(nodesListManagerEvent);
+    Assert.assertEquals(NodesListManagerEventType.NODE_USABLE, 
+        nodesListManagerEvent.getType());
+  }
+
+  @Test
+  public void testReconnect() {
+    RMNodeImpl node = getRunningNode();
+    ClusterMetrics cm = ClusterMetrics.getMetrics();
+    int initialActive = cm.getNumActiveNMs();
+    int initialLost = cm.getNumLostNMs();
+    int initialUnhealthy = cm.getUnhealthyNMs();
+    int initialDecommissioned = cm.getNumDecommisionedNMs();
+    int initialRebooted = cm.getNumRebootedNMs();
+    node.handle(new RMNodeReconnectEvent(node.getNodeID(), node));
+    Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
+    Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
+    Assert.assertEquals("Unhealthy Nodes",
+        initialUnhealthy, cm.getUnhealthyNMs());
+    Assert.assertEquals("Decommissioned Nodes",
+        initialDecommissioned, cm.getNumDecommisionedNMs());
+    Assert.assertEquals("Rebooted Nodes",
+        initialRebooted, cm.getNumRebootedNMs());
+    Assert.assertEquals(NodeState.RUNNING, node.getState());
+    Assert.assertNotNull(nodesListManagerEvent);
+    Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
+        nodesListManagerEvent.getType());
+  }
+
 }

+ 124 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java

@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNMReconnect {
+  private static final RecordFactory recordFactory = 
+      RecordFactoryProvider.getRecordFactory(null);
+
+  private RMNodeEvent rmNodeEvent = null;
+
+  private class TestRMNodeEventDispatcher implements
+      EventHandler<RMNodeEvent> {
+
+    @Override
+    public void handle(RMNodeEvent event) {
+      rmNodeEvent = event;
+    }
+
+  }
+
+  ResourceTrackerService resourceTrackerService;
+
+  @Before
+  public void setUp() {
+    Configuration conf = new Configuration();
+    // Dispatcher that processes events inline
+    Dispatcher dispatcher = new InlineDispatcher();
+
+    dispatcher.register(RMNodeEventType.class,
+        new TestRMNodeEventDispatcher());
+
+    RMContext context = new RMContextImpl(dispatcher, null,
+        null, null, null, null, null, null, null);
+    dispatcher.register(SchedulerEventType.class,
+        new InlineDispatcher.EmptyEventHandler());
+    dispatcher.register(RMNodeEventType.class,
+        new NodeEventDispatcher(context));
+    NMLivelinessMonitor nmLivelinessMonitor = new NMLivelinessMonitor(
+        dispatcher);
+    nmLivelinessMonitor.init(conf);
+    nmLivelinessMonitor.start();
+    NodesListManager nodesListManager = new NodesListManager(context);
+    nodesListManager.init(conf);
+    RMContainerTokenSecretManager containerTokenSecretManager =
+        new RMContainerTokenSecretManager(conf);
+    containerTokenSecretManager.start();
+    NMTokenSecretManagerInRM nmTokenSecretManager =
+        new NMTokenSecretManagerInRM(conf);
+    nmTokenSecretManager.start();
+    resourceTrackerService = new ResourceTrackerService(context,
+        nodesListManager, nmLivelinessMonitor, containerTokenSecretManager,
+        nmTokenSecretManager);
+    
+    resourceTrackerService.init(conf);
+    resourceTrackerService.start();
+  }
+
+  @Test
+  public void testReconnect() throws Exception {
+    String hostname1 = "localhost1";
+    Resource capability = BuilderUtils.newResource(1024, 1);
+
+    RegisterNodeManagerRequest request1 = recordFactory
+        .newRecordInstance(RegisterNodeManagerRequest.class);
+    NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
+    request1.setNodeId(nodeId1);
+    request1.setHttpPort(0);
+    request1.setResource(capability);
+    resourceTrackerService.registerNodeManager(request1);
+
+    Assert.assertEquals(RMNodeEventType.STARTED, rmNodeEvent.getType());
+
+    rmNodeEvent = null;
+    resourceTrackerService.registerNodeManager(request1);
+    Assert.assertEquals(RMNodeEventType.RECONNECTED, rmNodeEvent.getType());
+
+    rmNodeEvent = null;
+    resourceTrackerService.registerNodeManager(request1);
+    capability = BuilderUtils.newResource(1024, 2);
+    request1.setResource(capability);
+    Assert.assertEquals(RMNodeEventType.RECONNECTED, rmNodeEvent.getType());
+  }
+}