浏览代码

YARN-11730. Mark unreported nodes as LOST on RM Startup/HA failover (#7049) Contributed by Arjun Mohnot.

Reviewed-by: Shilun Fan <slfan1989@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
Arjun Mohnot 7 月之前
父节点
当前提交
d8ca2dbe34

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -1277,6 +1277,13 @@ public class YarnConfiguration extends Configuration {
       RM_PREFIX + "nodemanager-graceful-decommission-timeout-secs";
   public static final int DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT = 3600;
 
+  /**
+   * Enable/disable tracking of unregistered nodes.
+   **/
+  public static final String ENABLE_TRACKING_FOR_UNREGISTERED_NODES =
+      RM_PREFIX + "enable-tracking-for-unregistered-nodes";
+  public static final boolean DEFAULT_ENABLE_TRACKING_FOR_UNREGISTERED_NODES = false;
+
   /**
    * Period in seconds of the poll timer task inside DecommissioningNodesWatcher
    * to identify and take care of DECOMMISSIONING nodes missing regular heart beat.

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -5810,4 +5810,13 @@
     <value>30s</value>
   </property>
 
+  <property>
+    <description>
+      The setting that controls whether the ResourceManager should track the nodes as
+      lost when they are unregistered and not reported to the RM.
+      It doesn't account for decommissioned nodes. Default is false.
+    </description>
+    <name>yarn.resourcemanager.enable-tracking-for-unregistered-nodes</name>
+    <value>false</value>
+  </property>
 </configuration>

+ 124 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java

@@ -280,6 +280,7 @@ public class NodesListManager extends CompositeService implements
         StringUtils.join(",", hostsReader.getExcludedHosts()) + "}");
 
     handleExcludeNodeList(graceful, timeout);
+    markUnregisteredNodesAsLost(yarnConf);
   }
 
   private void setDecommissionedNMs() {
@@ -387,6 +388,115 @@ public class NodesListManager extends CompositeService implements
     updateInactiveNodes();
   }
 
+  /**
+   * Marks the unregistered nodes as LOST
+   * if the feature is enabled via a configuration flag.
+   *
+   * This method finds nodes that are present in the include list but are not
+   * registered with the ResourceManager. Such nodes are then marked as LOST.
+   *
+   * The steps are as follows:
+   * 1. Retrieve all hostnames of registered nodes from RM.
+   * 2. Identify the nodes present in the include list but are not registered
+   * 3. Remove nodes from the exclude list
+   * 4. Dispatch LOST events for filtered nodes to mark them as LOST.
+   *
+   * @param yarnConf Configuration object that holds the YARN configurations.
+   */
+  private void markUnregisteredNodesAsLost(Configuration yarnConf) {
+    // Check if tracking unregistered nodes is enabled in the configuration
+    if (!yarnConf.getBoolean(YarnConfiguration.ENABLE_TRACKING_FOR_UNREGISTERED_NODES,
+        YarnConfiguration.DEFAULT_ENABLE_TRACKING_FOR_UNREGISTERED_NODES)) {
+      LOG.debug("Unregistered node tracking is disabled. " +
+          "Skipping marking unregistered nodes as LOST.");
+      return;
+    }
+
+    // Set to store all registered hostnames from both active and inactive lists
+    Set<String> registeredHostNames = gatherRegisteredHostNames();
+    // Event handler to dispatch LOST events
+    EventHandler eventHandler = this.rmContext.getDispatcher().getEventHandler();
+
+    // Identify nodes that are in the include list but are not registered
+    // and are not in the exclude list
+    List<String> nodesToMarkLost = new ArrayList<>();
+    HostDetails hostDetails = hostsReader.getHostDetails();
+    Set<String> includes = hostDetails.getIncludedHosts();
+    Set<String> excludes = hostDetails.getExcludedHosts();
+
+    for (String includedNode : includes) {
+      if (!registeredHostNames.contains(includedNode) && !excludes.contains(includedNode)) {
+        LOG.info("Lost node: {}", includedNode);
+        nodesToMarkLost.add(includedNode);
+      }
+    }
+
+    // Dispatch LOST events for the identified lost nodes
+    for (String lostNode : nodesToMarkLost) {
+      dispatchLostEvent(eventHandler, lostNode);
+    }
+
+    // Log successful completion of marking unregistered nodes as LOST
+    LOG.info("Successfully marked unregistered nodes as LOST");
+  }
+
+  /**
+   * Gathers all registered hostnames from both active and inactive RMNodes.
+   *
+   * @return A set of registered hostnames.
+   */
+  private Set<String> gatherRegisteredHostNames() {
+    Set<String> registeredHostNames = new HashSet<>();
+    LOG.info("Getting all the registered hostnames");
+
+    // Gather all registered nodes (active) from RM into the set
+    for (RMNode node : this.rmContext.getRMNodes().values()) {
+      registeredHostNames.add(node.getHostName());
+    }
+
+    // Gather all inactive nodes from RM into the set
+    for (RMNode node : this.rmContext.getInactiveRMNodes().values()) {
+      registeredHostNames.add(node.getHostName());
+    }
+
+    return registeredHostNames;
+  }
+
+  /**
+   * Dispatches a LOST event for a specified lost node.
+   *
+   * @param eventHandler The EventHandler used to dispatch the LOST event.
+   * @param lostNode     The hostname of the lost node for which the event is
+   *                     being dispatched.
+   */
+  private void dispatchLostEvent(EventHandler eventHandler, String lostNode) {
+    // Generate a NodeId for the lost node with a special port -2
+    NodeId nodeId = createLostNodeId(lostNode);
+    RMNodeEvent lostEvent = new RMNodeEvent(nodeId, RMNodeEventType.EXPIRE);
+    RMNodeImpl rmNode = new RMNodeImpl(nodeId, this.rmContext, lostNode, -2, -2,
+        new UnknownNode(lostNode), Resource.newInstance(0, 0), "unknown");
+
+    try {
+      // Dispatch the LOST event to signal the node is no longer active
+      eventHandler.handle(lostEvent);
+
+      // After successful dispatch, update the node status in RMContext
+      // Set the node's timestamp for when it became untracked
+      rmNode.setUntrackedTimeStamp(Time.monotonicNow());
+
+      // Add the node to the active and inactive node maps in RMContext
+      this.rmContext.getRMNodes().put(nodeId, rmNode);
+      this.rmContext.getInactiveRMNodes().put(nodeId, rmNode);
+
+      LOG.info("Successfully dispatched LOST event and deactivated node: {}, Node ID: {}",
+          lostNode, nodeId);
+    } catch (Exception e) {
+      // Log any exception encountered during event dispatch
+      LOG.error("Error dispatching LOST event for node: {}, Node ID: {} - {}",
+          lostNode, nodeId, e.getMessage());
+    }
+  }
+
   @VisibleForTesting
   public int getNodeRemovalCheckInterval() {
     return nodeRemovalCheckInterval;
@@ -711,6 +821,20 @@ public class NodesListManager extends CompositeService implements
     return NodeId.newInstance(host, -1);
   }
 
+  /**
+   * Creates a NodeId for a node marked as LOST.
+   *
+   * The NodeId combines the hostname with a special port value of -2, indicating
+   * that the node is lost in the cluster.
+   *
+   * @param host The hostname of the lost node.
+   * @return NodeId Unique identifier for the lost node, with the port set to -2.
+   */
+  public static NodeId createLostNodeId(String host) {
+    // Create a NodeId with the given host and port -2 to signify the node is lost.
+    return NodeId.newInstance(host, -2);
+  }
+
   /**
    * A Node instance needed upon startup for populating inactive nodes Map.
    * It only knows its hostname/ip.

+ 13 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -1603,6 +1603,18 @@ public class ResourceManager extends CompositeService
       int port = webApp.port();
       WebAppUtils.setRMWebAppPort(conf, port);
     }
+
+    // Refresh node state before the service startup to reflect the unregistered
+    // nodemanagers as LOST if the tracking for unregistered nodes flag is enabled.
+    // For HA setup, refreshNodes is already being called before the active
+    // transition.
+    Configuration yarnConf = getConfig();
+    if (!this.rmContext.isHAEnabled() && yarnConf.getBoolean(
+        YarnConfiguration.ENABLE_TRACKING_FOR_UNREGISTERED_NODES,
+        YarnConfiguration.DEFAULT_ENABLE_TRACKING_FOR_UNREGISTERED_NODES)) {
+      this.rmContext.getNodesListManager().refreshNodes(yarnConf);
+    }
+
     super.serviceStart();
 
     // Non HA case, start after RM services are started.
@@ -1610,7 +1622,7 @@ public class ResourceManager extends CompositeService
       transitionToActive();
     }
   }
-  
+
   protected void doSecureLogin() throws IOException {
 	InetSocketAddress socAddr = getBindAddress(conf);
     SecurityUtil.login(this.conf, YarnConfiguration.RM_KEYTAB,

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -224,6 +224,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       .addTransition(NodeState.NEW, NodeState.DECOMMISSIONED,
           RMNodeEventType.DECOMMISSION,
           new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
+      .addTransition(NodeState.NEW, NodeState.LOST,
+          RMNodeEventType.EXPIRE,
+          new DeactivateNodeTransition(NodeState.LOST))
       .addTransition(NodeState.NEW, NodeState.NEW,
           RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
           new AddContainersToBeRemovedFromNMTransition())
@@ -958,6 +961,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         if (previousRMNode != null) {
           ClusterMetrics.getMetrics().decrDecommisionedNMs();
         }
+
+        // Check if the node was lost before
+        NodeId lostNodeId = NodesListManager.createLostNodeId(nodeId.getHost());
+        RMNode previousRMLostNode = rmNode.context.getInactiveRMNodes().remove(lostNodeId);
+        if (previousRMLostNode != null) {
+          // Remove the record of the lost node and update the metrics
+          rmNode.context.getRMNodes().remove(lostNodeId);
+          ClusterMetrics.getMetrics().decrNumLostNMs();
+        }
+
         containers = startEvent.getNMContainerStatuses();
         final Resource allocatedResource = Resource.newInstance(
             Resources.none());

+ 63 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

@@ -3303,4 +3303,67 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
 
     rm.close();
   }
+
+  /**
+   * Test case to verify the behavior of ResourceManager when unregistered nodes
+   * are marked as 'LOST' and node metrics are correctly updated in the system.
+   *
+   * @throws Exception if any unexpected behavior occurs
+   */
+  @Test
+  public void testMarkUnregisteredNodesAsLost() throws Exception {
+    // Step 1: Create a Configuration object to hold the settings.
+    Configuration conf = new Configuration();
+
+    // Step 2: Setup the host files.
+    // Include the following hosts: test_host1, test_host2, test_host3, test_host4
+    writeToHostsFile(hostFile, "test_host1", "test_host2", "test_host3", "test_host4");
+    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile.getAbsolutePath());
+
+    // Exclude the following host: test_host4
+    writeToHostsFile(excludeHostFile, "test_host4");
+    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile.getAbsolutePath());
+
+    // Enable tracking for unregistered nodes in the ResourceManager configuration
+    conf.setBoolean(YarnConfiguration.ENABLE_TRACKING_FOR_UNREGISTERED_NODES, true);
+
+    // Step 3: Create a MockRM (ResourceManager) instance to simulate RM behavior
+    rm = new MockRM(conf);
+    RMContext rmContext = rm.getRMContext(); // Retrieve the ResourceManager context
+    ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); // Get cluster metrics for nodes
+    rm.start(); // Start the ResourceManager instance
+
+    // Step 4: Register and simulate node activity for "test_host1"
+    TimeUnit.MILLISECONDS.sleep(50); // Allow some time for event dispatch
+    MockNM nm1 = rm.registerNode("test_host1:1234", 5120); // Register test_host1 with 5120MB
+    nm1.nodeHeartbeat(true); // Send heartbeat to simulate the node being alive
+    TimeUnit.MILLISECONDS.sleep(50); // Allow some time for event processing
+
+    // Step 5: Validate that test_host3 is marked as a LOST node
+    Assert.assertNotNull(clusterMetrics); // Ensure metrics are not null
+    assertEquals("test_host3 should be a lost NM!",
+        NodeState.LOST,
+        rmContext.getInactiveRMNodes().get(
+            rm.getNodesListManager().createLostNodeId("test_host3")).getState());
+
+    // Step 6: Validate node metrics for lost, active, and decommissioned nodes
+    // Two nodes are lost
+    assertEquals("There should be 2 Lost NM!", 2, clusterMetrics.getNumLostNMs());
+    // One node is active
+    assertEquals("There should be 1 Active NM!", 1, clusterMetrics.getNumActiveNMs());
+    // One node is decommissioned
+    assertEquals("There should be 1 Decommissioned NM!", 1,
+        clusterMetrics.getNumDecommisionedNMs());
+
+    // Step 7: Register and simulate node activity for "test_host3"
+    MockNM nm3 = rm.registerNode("test_host3:5678", 10240); // Register test_host3 with 10240MB
+    nm3.nodeHeartbeat(true); // Send heartbeat to simulate the node being alive
+    TimeUnit.MILLISECONDS.sleep(50); // Allow some time for event dispatch and processing
+
+    // Step 8: Validate updated node metrics after registering test_host3
+    assertEquals("There should be 1 Lost NM!", 1,
+        clusterMetrics.getNumLostNMs()); // Only one node is lost now
+    assertEquals("There should be 2 Active NM!", 2,
+        clusterMetrics.getNumActiveNMs()); // Two nodes are now active
+  }
 }