Browse Source

YARN-4176. Resync NM nodelabels with RM periodically for distributed nodelabels. (Bibin A Chundatt via wangda)

(cherry picked from commit 30ac69c6bd3db363248d6c742561371576006dab)
Wangda Tan 9 years ago
parent
commit
d24a3b9a3c

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -434,6 +434,9 @@ Release 2.8.0 - UNRELEASED
     YARN-4095. Avoid sharing AllocatorPerContext object in LocalDirAllocator
     between ShuffleHandler and LocalDirsHandlerService. (Zhihai Xu via jlowe)
 
+    YARN-4176. Resync NM nodelabels with RM periodically for distributed nodelabels. 
+    (Bibin A Chundatt via wangda)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -2019,6 +2019,12 @@ public class YarnConfiguration extends Configuration {
   private static final String NM_NODE_LABELS_PROVIDER_PREFIX =
       NM_NODE_LABELS_PREFIX + "provider.";
 
+  public static final String NM_NODE_LABELS_RESYNC_INTERVAL =
+      NM_NODE_LABELS_PREFIX + "resync-interval-ms";
+
+  public static final long DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL =
+      2 * 60 * 1000;
+
   // If -1 is configured then no timer task should be created
   public static final String NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
       NM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms";

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -2137,6 +2137,15 @@
     <value>600000</value>
   </property>
 
+  <property>
+    <description>
+   Interval at which node labels syncs with RM from NM.Will send loaded labels
+   every x intervals configured along with heartbeat from NM to RM.
+    </description>
+    <name>yarn.nodemanager.node-labels.resync-interval-ms</name>
+    <value>120000</value>
+  </property>
+
   <property>
     <description>
     When node labels "yarn.nodemanager.node-labels.provider"

+ 35 - 29
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -139,6 +139,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>();
 
   private NMNodeLabelsHandler nodeLabelsHandler;
+  private final NodeLabelsProvider nodeLabelsProvider;
 
   public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
       NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
@@ -150,9 +151,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       NodeLabelsProvider nodeLabelsProvider) {
     super(NodeStatusUpdaterImpl.class.getName());
     this.healthChecker = healthChecker;
-    nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider);
     this.context = context;
     this.dispatcher = dispatcher;
+    this.nodeLabelsProvider = nodeLabelsProvider;
     this.metrics = metrics;
     this.recentlyStoppedContainers = new LinkedHashMap<ContainerId, Long>();
     this.pendingCompletedContainers =
@@ -184,7 +185,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     this.minimumResourceManagerVersion = conf.get(
         YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION,
         YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION);
-    
+
+    nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider);
     // Default duration to track stopped containers on nodemanager is 10Min.
     // This should not be assigned very large value as it will remember all the
     // containers stopped during that time.
@@ -871,7 +873,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     if (nodeLabelsProvider == null) {
       return new NMCentralizedNodeLabelsHandler();
     } else {
-      return new NMDistributedNodeLabelsHandler(nodeLabelsProvider);
+      return new NMDistributedNodeLabelsHandler(nodeLabelsProvider,
+          this.getConfig());
     }
   }
 
@@ -936,16 +939,18 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   private static class NMDistributedNodeLabelsHandler
       implements NMNodeLabelsHandler {
     private NMDistributedNodeLabelsHandler(
-        NodeLabelsProvider nodeLabelsProvider) {
+        NodeLabelsProvider nodeLabelsProvider, Configuration conf) {
       this.nodeLabelsProvider = nodeLabelsProvider;
+      this.resyncInterval =
+          conf.getLong(YarnConfiguration.NM_NODE_LABELS_RESYNC_INTERVAL,
+              YarnConfiguration.DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL);
     }
 
     private final NodeLabelsProvider nodeLabelsProvider;
     private Set<NodeLabel> previousNodeLabels;
-    private boolean updatedLabelsSentToRM;
-    private long lastNodeLabelSendFailMills = 0L;
-    // TODO : Need to check which conf to use.Currently setting as 1 min
-    private static final long FAILEDLABELRESENDINTERVAL = 60000;
+    private boolean areLabelsSentToRM;
+    private long lastNodeLabelSendMills = 0L;
+    private final long resyncInterval;
 
     @Override
     public Set<NodeLabel> getNodeLabelsForRegistration() {
@@ -987,22 +992,28 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       // take some action only on modification of labels
       boolean areNodeLabelsUpdated =
           nodeLabelsForHeartbeat.size() != previousNodeLabels.size()
-              || !previousNodeLabels.containsAll(nodeLabelsForHeartbeat)
-              || checkResendLabelOnFailure();
+              || !previousNodeLabels.containsAll(nodeLabelsForHeartbeat);
 
-      updatedLabelsSentToRM = false;
-      if (areNodeLabelsUpdated) {
+      areLabelsSentToRM = false;
+      // When nodelabels elapsed or resync time is elapsed will send again in
+      // heartbeat.
+      if (areNodeLabelsUpdated || isResyncIntervalElapsed()) {
         previousNodeLabels = nodeLabelsForHeartbeat;
         try {
-          LOG.info("Modified labels from provider: "
-              + StringUtils.join(",", previousNodeLabels));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Labels from provider: "
+                + StringUtils.join(",", previousNodeLabels));
+          }
           validateNodeLabels(nodeLabelsForHeartbeat);
-          updatedLabelsSentToRM = true;
+          areLabelsSentToRM = true;
         } catch (IOException e) {
           // set previous node labels to invalid set, so that invalid
           // labels are not verified for every HB, and send empty set
           // to RM to have same nodeLabels which was earlier set.
           nodeLabelsForHeartbeat = null;
+        } finally {
+          // Set last send time in heartbeat
+          lastNodeLabelSendMills = System.currentTimeMillis();
         }
       } else {
         // if nodelabels have not changed then no need to send
@@ -1033,16 +1044,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     }
 
     /*
-     * In case of failure when RM doesnt accept labels need to resend Labels to
-     * RM. This method checks whether we need to resend
+     * This method checks resync interval is elapsed or not.
      */
-    public boolean checkResendLabelOnFailure() {
-      if (lastNodeLabelSendFailMills > 0L) {
-        long lastFailTimePassed =
-            System.currentTimeMillis() - lastNodeLabelSendFailMills;
-        if (lastFailTimePassed > FAILEDLABELRESENDINTERVAL) {
-          return true;
-        }
+    public boolean isResyncIntervalElapsed() {
+      long elapsedTimeSinceLastSync =
+          System.currentTimeMillis() - lastNodeLabelSendMills;
+      if (elapsedTimeSinceLastSync > resyncInterval) {
+        return true;
       }
       return false;
     }
@@ -1050,15 +1058,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     @Override
     public void verifyRMHeartbeatResponseForNodeLabels(
         NodeHeartbeatResponse response) {
-      if (updatedLabelsSentToRM) {
-        if (response.getAreNodeLabelsAcceptedByRM()) {
-          lastNodeLabelSendFailMills = 0L;
-          LOG.info("Node Labels {" + StringUtils.join(",", previousNodeLabels)
+      if (areLabelsSentToRM) {
+        if (response.getAreNodeLabelsAcceptedByRM() && LOG.isDebugEnabled()) {
+          LOG.debug("Node Labels {" + StringUtils.join(",", previousNodeLabels)
               + "} were Accepted by RM ");
         } else {
           // case where updated labels from NodeLabelsProvider is sent to RM and
           // RM rejected the labels
-          lastNodeLabelSendFailMills = System.currentTimeMillis();
           LOG.error(
               "NM node labels {" + StringUtils.join(",", previousNodeLabels)
                   + "} were not accepted by RM and message from RM : "

+ 24 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java

@@ -250,6 +250,7 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
     };
 
     YarnConfiguration conf = createNMConfigForDistributeNodeLabels();
+    conf.setLong(YarnConfiguration.NM_NODE_LABELS_RESYNC_INTERVAL, 2000);
     nm.init(conf);
     resourceTracker.resetNMHeartbeatReceiveFlag();
     nm.start();
@@ -288,7 +289,29 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
     assertTrue("If provider sends null then empty labels should be sent",
         resourceTracker.labels.isEmpty());
     resourceTracker.resetNMHeartbeatReceiveFlag();
-
+    // Since the resync interval is set to 2 sec in every alternate heartbeat
+    // the labels will be send along with heartbeat.In loop we sleep for 1 sec
+    // so that every sec 1 heartbeat is send.
+    int nullLabels = 0;
+    int nonNullLabels = 0;
+    dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("P1"));
+    for (int i = 0; i < 5; i++) {
+      nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
+      resourceTracker.waitTillHeartbeat();
+      if (null == resourceTracker.labels) {
+        nullLabels++;
+      } else {
+        Assert.assertEquals("In heartbeat PI labels should be send",
+            toNodeLabelSet("P1"), resourceTracker.labels);
+        nonNullLabels++;
+      }
+      resourceTracker.resetNMHeartbeatReceiveFlag();
+      Thread.sleep(1000);
+    }
+    Assert.assertTrue("More than one heartbeat with empty labels expected",
+        nullLabels > 1);
+    Assert.assertTrue("More than one heartbeat with labels expected",
+        nonNullLabels > 1);
     nm.stop();
   }