Selaa lähdekoodia

YARN-3802. Two RMNodes for the same NodeId are used in RM sometimes
after NM is reconnected. Contributed by zhihai xu

(cherry picked from commit 5b5bb8dcdc888ba1ebc7e4eba0fa0e7e79edda9a)

Xuan 10 vuotta sitten
vanhempi
commit
86b75ac544

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -491,6 +491,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3824. Fix two minor nits in member variable properties
     YARN-3824. Fix two minor nits in member variable properties
     of YarnConfiguration. (Ray Chiang via devaraj)
     of YarnConfiguration. (Ray Chiang via devaraj)
 
 
+    YARN-3802. Two RMNodes for the same NodeId are used in RM sometimes
+    after NM is reconnected. (zhihai xu via xgong)
+
 Release 2.7.1 - UNRELEASED
 Release 2.7.1 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 6 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -597,10 +597,14 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         if (rmNode.getHttpPort() == newNode.getHttpPort()) {
         if (rmNode.getHttpPort() == newNode.getHttpPort()) {
           // Reset heartbeat ID since node just restarted.
           // Reset heartbeat ID since node just restarted.
           rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
           rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
+          if (!rmNode.getTotalCapability().equals(
+              newNode.getTotalCapability())) {
+            rmNode.totalCapability = newNode.getTotalCapability();
+          }
           if (rmNode.getState().equals(NodeState.RUNNING)) {
           if (rmNode.getState().equals(NodeState.RUNNING)) {
-            // Only add new node if old state is RUNNING
+            // Only add old node if old state is RUNNING
             rmNode.context.getDispatcher().getEventHandler().handle(
             rmNode.context.getDispatcher().getEventHandler().handle(
-                new NodeAddedSchedulerEvent(newNode));
+                new NodeAddedSchedulerEvent(rmNode));
           }
           }
         } else {
         } else {
           // Reconnected node differs, so replace old node and start new node
           // Reconnected node differs, so replace old node and start new node

+ 65 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java

@@ -25,6 +25,9 @@ import org.junit.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.ConfigurationProvider;
+import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
@@ -32,6 +35,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
@@ -39,10 +43,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDi
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
 
 
@@ -51,6 +58,8 @@ public class TestNMReconnect {
       RecordFactoryProvider.getRecordFactory(null);
       RecordFactoryProvider.getRecordFactory(null);
 
 
   private List<RMNodeEvent> rmNodeEvents = new ArrayList<RMNodeEvent>();
   private List<RMNodeEvent> rmNodeEvents = new ArrayList<RMNodeEvent>();
+  private Dispatcher dispatcher;
+  private RMContextImpl context;
 
 
   private class TestRMNodeEventDispatcher implements
   private class TestRMNodeEventDispatcher implements
       EventHandler<RMNodeEvent> {
       EventHandler<RMNodeEvent> {
@@ -68,12 +77,12 @@ public class TestNMReconnect {
   public void setUp() {
   public void setUp() {
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
     // Dispatcher that processes events inline
     // Dispatcher that processes events inline
-    Dispatcher dispatcher = new InlineDispatcher();
+    dispatcher = new InlineDispatcher();
 
 
     dispatcher.register(RMNodeEventType.class,
     dispatcher.register(RMNodeEventType.class,
         new TestRMNodeEventDispatcher());
         new TestRMNodeEventDispatcher());
 
 
-    RMContext context = new RMContextImpl(dispatcher, null,
+    context = new RMContextImpl(dispatcher, null,
         null, null, null, null, null, null, null, null);
         null, null, null, null, null, null, null, null);
     dispatcher.register(SchedulerEventType.class,
     dispatcher.register(SchedulerEventType.class,
         new InlineDispatcher.EmptyEventHandler());
         new InlineDispatcher.EmptyEventHandler());
@@ -99,6 +108,11 @@ public class TestNMReconnect {
     resourceTrackerService.start();
     resourceTrackerService.start();
   }
   }
 
 
+  @After
+  public void tearDown() {
+    resourceTrackerService.stop();
+  }
+
   @Test
   @Test
   public void testReconnect() throws Exception {
   public void testReconnect() throws Exception {
     String hostname1 = "localhost1";
     String hostname1 = "localhost1";
@@ -126,4 +140,53 @@ public class TestNMReconnect {
     Assert.assertEquals(RMNodeEventType.RECONNECTED,
     Assert.assertEquals(RMNodeEventType.RECONNECTED,
         rmNodeEvents.get(0).getType());
         rmNodeEvents.get(0).getType());
   }
   }
+
+  @Test
+  public void testCompareRMNodeAfterReconnect() throws Exception {
+    Configuration yarnConf = new YarnConfiguration();
+    CapacityScheduler scheduler = new CapacityScheduler();
+    scheduler.setConf(yarnConf);
+    ConfigurationProvider configurationProvider =
+        ConfigurationProviderFactory.getConfigurationProvider(yarnConf);
+    configurationProvider.init(yarnConf);
+    context.setConfigurationProvider(configurationProvider);
+    RMNodeLabelsManager nlm = new RMNodeLabelsManager();
+    nlm.init(yarnConf);
+    nlm.start();
+    context.setNodeLabelManager(nlm);
+    scheduler.setRMContext(context);
+    scheduler.init(yarnConf);
+    scheduler.start();
+    dispatcher.register(SchedulerEventType.class, scheduler);
+
+    String hostname1 = "localhost1";
+    Resource capability = BuilderUtils.newResource(4096, 4);
+
+    RegisterNodeManagerRequest request1 = recordFactory
+        .newRecordInstance(RegisterNodeManagerRequest.class);
+    NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
+    request1.setNodeId(nodeId1);
+    request1.setHttpPort(0);
+    request1.setResource(capability);
+    resourceTrackerService.registerNodeManager(request1);
+    Assert.assertNotNull(context.getRMNodes().get(nodeId1));
+    // verify Scheduler and RMContext use same RMNode reference.
+    Assert.assertTrue(scheduler.getSchedulerNode(nodeId1).getRMNode() ==
+        context.getRMNodes().get(nodeId1));
+    Assert.assertEquals(context.getRMNodes().get(nodeId1).
+        getTotalCapability(), capability);
+    Resource capability1 = BuilderUtils.newResource(2048, 2);
+    request1.setResource(capability1);
+    resourceTrackerService.registerNodeManager(request1);
+    Assert.assertNotNull(context.getRMNodes().get(nodeId1));
+    // verify Scheduler and RMContext use same RMNode reference
+    // after reconnect.
+    Assert.assertTrue(scheduler.getSchedulerNode(nodeId1).getRMNode() ==
+        context.getRMNodes().get(nodeId1));
+    // verify RMNode's capability is changed.
+    Assert.assertEquals(context.getRMNodes().get(nodeId1).
+        getTotalCapability(), capability1);
+    nlm.stop();
+    scheduler.stop();
+  }
 }
 }