Browse Source

MAPREDUCE-3730. Modified RM to allow restarted NMs to be able to join the cluster without waiting for expiry. Contributed by Jason Lowe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1293436 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 13 years ago
parent
commit
c0572656ce
11 changed files with 228 additions and 21 deletions
  1. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 10 9
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  3. 1 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
  4. 40 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  5. 34 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
  6. 4 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  7. 4 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  8. 9 6
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
  9. 35 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
  10. 61 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
  11. 27 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -111,6 +111,9 @@ Release 0.23.2 - UNRELEASED
     MAPREDUCE-3866. Fixed the bin/yarn script to not print the command line
     unnecessarily. (vinodkv)
 
+    MAPREDUCE-3730. Modified RM to allow restarted NMs to be able to join the
+    cluster without waiting for expiry. (Jason Lowe via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 10 - 9
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
@@ -177,17 +178,17 @@ public class ResourceTrackerService extends AbstractService implements
     RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
         resolve(host), capability);
 
-    if (this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode) != null) {
-      LOG.info("Duplicate registration from the node at: " + host
-          + ", Sending SHUTDOWN Signal to the NodeManager");
-      regResponse.setNodeAction(NodeAction.SHUTDOWN);
-      response.setRegistrationResponse(regResponse);
-      return response;
+    RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
+    if (oldNode == null) {
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
+    } else {
+      LOG.info("Reconnect from the node at: " + host);
+      this.nmLivelinessMonitor.unregister(nodeId);
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMNodeReconnectEvent(nodeId, rmNode));
     }
 
-    this.rmContext.getDispatcher().getEventHandler().handle(
-        new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
-
     this.nmLivelinessMonitor.register(nodeId);
 
     LOG.info("NodeManager from node " + host + "(cmPort: " + cmPort

+ 1 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java

@@ -28,6 +28,7 @@ public enum RMNodeEventType {
   // ResourceTrackerService
   STATUS_UPDATE,
   REBOOTING,
+  RECONNECTED,
 
   // Source: Application
   CLEANUP_APP,

+ 40 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -110,9 +110,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
                                            RMNodeEventType,
                                            RMNodeEvent>(RMNodeState.NEW)
   
-     //Transitions from RUNNING state
+     //Transitions from NEW state
      .addTransition(RMNodeState.NEW, RMNodeState.RUNNING, 
          RMNodeEventType.STARTED, new AddNodeTransition())
+
+     //Transitions from RUNNING state
      .addTransition(RMNodeState.RUNNING, 
          EnumSet.of(RMNodeState.RUNNING, RMNodeState.UNHEALTHY),
          RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
@@ -129,11 +131,15 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
          RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
      .addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING,
          RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
+     .addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING,
+         RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
 
      //Transitions from UNHEALTHY state
      .addTransition(RMNodeState.UNHEALTHY, 
          EnumSet.of(RMNodeState.UNHEALTHY, RMNodeState.RUNNING),
          RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenUnHealthyTransition())
+     .addTransition(RMNodeState.UNHEALTHY, RMNodeState.UNHEALTHY,
+         RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
          
      // create the topology tables
      .installTopology(); 
@@ -372,6 +378,39 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     }
   }
   
+  public static class ReconnectNodeTransition implements
+      SingleArcTransition<RMNodeImpl, RMNodeEvent> {
+
+    @Override
+    public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+      // Kill containers since node is rejoining.
+      rmNode.context.getDispatcher().getEventHandler().handle(
+          new NodeRemovedSchedulerEvent(rmNode));
+
+      RMNode newNode = ((RMNodeReconnectEvent)event).getReconnectedNode();
+      if (rmNode.getTotalCapability().equals(newNode.getTotalCapability())
+          && rmNode.getHttpPort() == newNode.getHttpPort()) {
+        // Reset heartbeat ID since node just restarted.
+        rmNode.getLastHeartBeatResponse().setResponseId(0);
+        rmNode.context.getDispatcher().getEventHandler().handle(
+            new NodeAddedSchedulerEvent(rmNode));
+      } else {
+        // Reconnected node differs, so replace old node and start new node
+        switch (rmNode.getState()) {
+        case RUNNING:
+          ClusterMetrics.getMetrics().decrNumActiveNodes();
+          break;
+        case UNHEALTHY:
+          ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
+          break;
+        }
+        rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
+        rmNode.context.getDispatcher().getEventHandler().handle(
+            new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED));
+      }
+    }
+  }
+
   public static class CleanUpAppTransition
     implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
 

+ 34 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java

@@ -0,0 +1,34 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class RMNodeReconnectEvent extends RMNodeEvent {
+  private RMNode reconnectedNode;
+
+  public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode) {
+    super(nodeId, RMNodeEventType.RECONNECTED);
+    reconnectedNode = newNode;
+  }
+
+  public RMNode getReconnectedNode() {
+    return reconnectedNode;
+  }
+}

+ 4 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -666,7 +666,10 @@ implements ResourceScheduler, CapacitySchedulerContext {
 
   private synchronized void removeNode(RMNode nodeInfo) {
     SchedulerNode node = this.nodes.get(nodeInfo.getNodeID());
-    Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
+    if (node == null) {
+      return;
+    }
+    Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
     root.updateClusterResource(clusterResource);
     --numNodeManagers;
 

+ 4 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -731,6 +731,9 @@ public class FifoScheduler implements ResourceScheduler {
 
   private synchronized void removeNode(RMNode nodeInfo) {
     SchedulerNode node = getNode(nodeInfo.getNodeID());
+    if (node == null) {
+      return;
+    }
     // Kill running containers
     for(RMContainer container : node.getRunningContainers()) {
       containerCompleted(container, 
@@ -744,7 +747,7 @@ public class FifoScheduler implements ResourceScheduler {
     this.nodes.remove(nodeInfo.getNodeID());
     
     // Update cluster metrics
-    Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
+    Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
   }
 
   @Override

+ 9 - 6
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java

@@ -19,23 +19,18 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
 
 import com.google.common.collect.Lists;
 
@@ -195,8 +190,12 @@ public class MockNodes {
   };
 
   private static RMNode buildRMNode(int rack, final Resource perNode, RMNodeState state, String httpAddr) {
+    return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++);
+  }
+
+  private static RMNode buildRMNode(int rack, final Resource perNode, RMNodeState state, String httpAddr, int hostnum) {
     final String rackName = "rack"+ rack;
-    final int nid = NODE_ID++;
+    final int nid = hostnum;
     final String hostName = "host"+ nid;
     final int port = 123;
     final NodeId nodeID = newNodeID(hostName, port);
@@ -219,4 +218,8 @@ public class MockNodes {
   public static RMNode newNodeInfo(int rack, final Resource perNode) {
     return buildRMNode(rack, perNode, RMNodeState.RUNNING, "localhost:0");
   }
+
+  public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum) {
+    return buildRMNode(rack, perNode, null, "localhost:0", hostnum);
+  }
 }

+ 35 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import junit.framework.Assert;
@@ -27,10 +28,17 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -167,10 +175,37 @@ public class TestFifoScheduler {
     testMinimumAllocation(conf);
   }
 
+  @Test
+  public void testReconnectedNode() throws Exception {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    conf.setQueues("default", new String[] {"default"});
+    conf.setCapacity("default", 100);
+    FifoScheduler fs = new FifoScheduler();
+    fs.reinitialize(conf, null, null);
+
+    RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
+    RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
+
+    fs.handle(new NodeAddedSchedulerEvent(n1));
+    fs.handle(new NodeAddedSchedulerEvent(n2));
+    List<ContainerStatus> emptyList = new ArrayList<ContainerStatus>();
+    fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList));
+    Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB());
+
+    // reconnect n1 with downgraded memory
+    n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
+    fs.handle(new NodeRemovedSchedulerEvent(n1));
+    fs.handle(new NodeAddedSchedulerEvent(n1));
+    fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList));
+
+    Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
+  }
+
   public static void main(String[] args) throws Exception {
     TestFifoScheduler t = new TestFifoScheduler();
     t.test();
     t.testDefaultMinimumAllocation();
     t.testNonDefaultMinimumAllocation();
+    t.testReconnectedNode();
   }
 }

+ 61 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

@@ -31,12 +31,17 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.After;
 import org.junit.Test;
@@ -189,7 +194,7 @@ public class TestResourceTrackerService {
     conf.set("yarn.resourcemanager.nodes.exclude-path", hostFile
         .getAbsolutePath());
 
-    MockRM rm = new MockRM(conf);
+    rm = new MockRM(conf);
     rm.start();
 
     MockNM nm1 = rm.registerNode("host1:1234", 5120);
@@ -223,6 +228,61 @@ public class TestResourceTrackerService {
         ClusterMetrics.getMetrics().getUnhealthyNMs());
   }
 
+  @Test
+  public void testReconnectNode() throws Exception {
+    final DrainDispatcher dispatcher = new DrainDispatcher();
+    MockRM rm = new MockRM() {
+      @Override
+      protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
+        return new SchedulerEventDispatcher(this.scheduler) {
+          @Override
+          public void handle(SchedulerEvent event) {
+            scheduler.handle(event);
+          }
+        };
+      }
+
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("host1:1234", 5120);
+    MockNM nm2 = rm.registerNode("host2:5678", 5120);
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(false);
+    checkUnealthyNMCount(rm, nm2, true, 1);
+    final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs();
+    QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
+    Assert.assertEquals(5120 + 5120, metrics.getAvailableMB());
+
+    // reconnect of healthy node
+    nm1 = rm.registerNode("host1:1234", 5120);
+    HeartbeatResponse response = nm1.nodeHeartbeat(true);
+    Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
+    dispatcher.await();
+    Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
+    checkUnealthyNMCount(rm, nm2, true, 1);
+
+    // reconnect of unhealthy node
+    nm2 = rm.registerNode("host2:5678", 5120);
+    response = nm2.nodeHeartbeat(false);
+    Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
+    dispatcher.await();
+    Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
+    checkUnealthyNMCount(rm, nm2, true, 1);
+
+    // reconnect of node with changed capability
+    nm1 = rm.registerNode("host2:5678", 10240);
+    dispatcher.await();
+    response = nm2.nodeHeartbeat(true);
+    dispatcher.await();
+    Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
+    Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());
+  }
+
   private void writeToHostsFile(String... hosts) throws IOException {
     if (!hostFile.exists()) {
       TEMP_DIR.mkdirs();

+ 27 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.Task;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
@@ -41,12 +42,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 public class TestCapacityScheduler {
   private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
+  private final int GB = 1024;
   
   private static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
   private static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
@@ -97,8 +101,6 @@ public class TestCapacityScheduler {
 
     LOG.info("--- START: testCapacityScheduler ---");
         
-    final int GB = 1024;
-    
     // Register node1
     String host_0 = "host_0";
     org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = 
@@ -340,4 +342,27 @@ public class TestCapacityScheduler {
     cs.reinitialize(conf, null, null);
   }
 
+  @Test
+  public void testReconnectedNode() throws Exception {
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.reinitialize(csConf, null, null);
+
+    RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
+    RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
+
+    cs.handle(new NodeAddedSchedulerEvent(n1));
+    cs.handle(new NodeAddedSchedulerEvent(n2));
+
+    Assert.assertEquals(6 * GB, cs.getClusterResources().getMemory());
+
+    // reconnect n1 with downgraded memory
+    n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
+    cs.handle(new NodeRemovedSchedulerEvent(n1));
+    cs.handle(new NodeAddedSchedulerEvent(n1));
+
+    Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory());
+  }
 }