Browse Source

YARN-4024. YARN RM should avoid unnecessary resolving IP when NMs doing heartbeat. (Hong Zhiguo via wangda)

(cherry picked from commit bcc85e3bab78bcacd430eac23141774465b96ef9)
Wangda Tan 10 năm trước cách đây
mục cha
commit
9f97b86816

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -761,6 +761,9 @@ Release 2.8.0 - UNRELEASED
     YARN-4073. Removed unused ApplicationACLsManager in ContainerManagerImpl constructor.
     (Naganarasimha G R via rohithsharmaks)
 
+    YARN-4024. YARN RM should avoid unnecessary resolving IP when NMs doing heartbeat.
+    (Hong Zhiguo via wangda)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -746,6 +746,11 @@ public class YarnConfiguration extends Configuration {
       + "proxy-user-privileges.enabled";
   public static final boolean DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED = false;
 
+  /** The expiry interval for node IP caching. -1 disables the caching */
+  public static final String RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS = RM_PREFIX
+      + "node-ip-cache.expiry-interval-secs";
+  public static final int DEFAULT_RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS = -1;
+
   /**
    * How many diagnostics/failure messages can be saved in RM for
    * log aggregation. It also defines the number of diagnostics/failure

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -272,6 +272,12 @@
     <value></value>
   </property>
 
+  <property>
+    <description>The expiry interval for node IP caching. -1 disables the caching</description>
+    <name>yarn.resourcemanager.node-ip-cache.expiry-interval-secs</name>
+    <value>-1</value>
+  </property>
+
   <property>
     <description>Number of threads to handle resource tracker calls.</description>
     <name>yarn.resourcemanager.resource-tracker.client.thread-count</name>

+ 139 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java

@@ -24,13 +24,18 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
+import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
@@ -46,9 +51,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
 
 @SuppressWarnings("unchecked")
-public class NodesListManager extends AbstractService implements
+public class NodesListManager extends CompositeService implements
     EventHandler<NodesListManagerEvent> {
 
   private static final Log LOG = LogFactory.getLog(NodesListManager.class);
@@ -63,6 +70,8 @@ public class NodesListManager extends AbstractService implements
   private String includesFile;
   private String excludesFile;
 
+  private Resolver resolver;
+
   public NodesListManager(RMContext rmContext) {
     super(NodesListManager.class.getName());
     this.rmContext = rmContext;
@@ -73,6 +82,16 @@ public class NodesListManager extends AbstractService implements
 
     this.conf = conf;
 
+    int nodeIpCacheTimeout = conf.getInt(
+        YarnConfiguration.RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS,
+        YarnConfiguration.DEFAULT_RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS);
+    if (nodeIpCacheTimeout <= 0) {
+      resolver = new DirectResolver();
+    } else {
+      resolver = new CachedResolver(new SystemClock(), nodeIpCacheTimeout);
+      addIfService(resolver);
+    }
+
     // Read the hosts/exclude files to restrict access to the RM
     try {
       this.includesFile = conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
@@ -148,17 +167,129 @@ public class NodesListManager extends AbstractService implements
     ClusterMetrics.getMetrics().setDecommisionedNMs(excludeList.size());
   }
 
+  @VisibleForTesting
+  public Resolver getResolver() {
+    return resolver;
+  }
+
+  @VisibleForTesting
+  public interface Resolver {
+    // try to resolve hostName to IP address, fallback to hostName if failed
+    String resolve(String hostName);
+  }
+
+  @VisibleForTesting
+  public static class DirectResolver implements Resolver {
+    @Override
+    public String resolve(String hostName) {
+      return NetUtils.normalizeHostName(hostName);
+    }
+  }
+
+  @VisibleForTesting
+  public static class CachedResolver extends AbstractService
+      implements Resolver {
+    private static class CacheEntry {
+      public String ip;
+      public long resolveTime;
+      public CacheEntry(String ip, long resolveTime) {
+        this.ip = ip;
+        this.resolveTime = resolveTime;
+      }
+    }
+    private Map<String, CacheEntry> cache =
+        new ConcurrentHashMap<String, CacheEntry>();
+    private int expiryIntervalMs;
+    private int checkIntervalMs;
+    private final Clock clock;
+    private Timer checkingTimer;
+    private TimerTask expireChecker = new ExpireChecker();
+
+    public CachedResolver(Clock clock, int expiryIntervalSecs) {
+      super("NodesListManager.CachedResolver");
+      this.clock = clock;
+      this.expiryIntervalMs = expiryIntervalSecs * 1000;
+      checkIntervalMs = expiryIntervalMs/3;
+      checkingTimer = new Timer(
+          "Timer-NodesListManager.CachedResolver.ExpireChecker", true);
+    }
+
+    @Override
+    protected void serviceStart() throws Exception {
+      checkingTimer.scheduleAtFixedRate(
+          expireChecker, checkIntervalMs, checkIntervalMs);
+      super.serviceStart();
+    }
+
+    @Override
+    protected void serviceStop() throws Exception {
+      checkingTimer.cancel();
+      super.serviceStop();
+    }
+
+    @VisibleForTesting
+    public void addToCache(String hostName, String ip) {
+      cache.put(hostName, new CacheEntry(ip, clock.getTime()));
+    }
+
+    public void removeFromCache(String hostName) {
+      cache.remove(hostName);
+    }
+
+    private String reload(String hostName) {
+      String ip = NetUtils.normalizeHostName(hostName);
+      addToCache(hostName, ip);
+      return ip;
+    }
+
+    @Override
+    public String resolve(String hostName) {
+      CacheEntry e = cache.get(hostName);
+      if (e != null) {
+        return e.ip;
+      }
+      return reload(hostName);
+    }
+
+    @VisibleForTesting
+    public TimerTask getExpireChecker() {
+      return expireChecker;
+    }
+
+    private class ExpireChecker extends TimerTask {
+      @Override
+      public void run() {
+        long currentTime = clock.getTime();
+        Iterator<Map.Entry<String, CacheEntry>> iterator =
+            cache.entrySet().iterator();
+        while (iterator.hasNext()) {
+          Map.Entry<String, CacheEntry> entry = iterator.next();
+          if (currentTime >
+              entry.getValue().resolveTime +
+                  CachedResolver.this.expiryIntervalMs) {
+            iterator.remove();
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("[" + entry.getKey() + ":" + entry.getValue().ip +
+                  "] Expired after " +
+                  CachedResolver.this.expiryIntervalMs / 1000 + " secs");
+            }
+          }
+        }
+      }
+    }
+  }
+
   public boolean isValidNode(String hostName) {
+    String ip = resolver.resolve(hostName);
     synchronized (hostsReader) {
       Set<String> hostsList = hostsReader.getHosts();
       Set<String> excludeList = hostsReader.getExcludedHosts();
-      String ip = NetUtils.normalizeHostName(hostName);
       return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList
           .contains(ip))
           && !(excludeList.contains(hostName) || excludeList.contains(ip));
     }
   }
-  
+
   /**
    * Provides the currently unusable nodes. Copies it into provided collection.
    * @param unUsableNodes
@@ -207,6 +338,11 @@ public class NodesListManager extends AbstractService implements
     default:
       LOG.error("Ignoring invalid eventtype " + event.getType());
     }
+    // remove the cache of normalized hostname if enabled
+    if (resolver instanceof CachedResolver) {
+      ((CachedResolver)resolver).removeFromCache(
+          eventNode.getNodeID().getHost());
+    }
   }
 
   private void disableHostsFileReader(Exception ex) {

+ 102 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java

@@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.util.ControlledClock;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -130,6 +132,106 @@ public class TestNodesListManager {
 
   }
 
+  @Test
+  public void testCachedResolver() throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    ControlledClock clock = new ControlledClock(new SystemClock());
+    clock.setTime(0);
+    final int CACHE_EXPIRY_INTERVAL_SECS = 30;
+    NodesListManager.CachedResolver resolver =
+        new NodesListManager.CachedResolver(clock, CACHE_EXPIRY_INTERVAL_SECS);
+    resolver.init(new YarnConfiguration());
+    resolver.start();
+    resolver.addToCache("testCachedResolverHost1", "1.1.1.1");
+    Assert.assertEquals("1.1.1.1",
+        resolver.resolve("testCachedResolverHost1"));
+
+    resolver.addToCache("testCachedResolverHost2", "1.1.1.2");
+    Assert.assertEquals("1.1.1.1",
+        resolver.resolve("testCachedResolverHost1"));
+    Assert.assertEquals("1.1.1.2",
+        resolver.resolve("testCachedResolverHost2"));
+
+    // test removeFromCache
+    resolver.removeFromCache("testCachedResolverHost1");
+    Assert.assertNotEquals("1.1.1.1",
+        resolver.resolve("testCachedResolverHost1"));
+    Assert.assertEquals("1.1.1.2",
+        resolver.resolve("testCachedResolverHost2"));
+
+    // test expiry
+    clock.tickMsec(CACHE_EXPIRY_INTERVAL_SECS * 1000 + 1);
+    resolver.getExpireChecker().run();
+    Assert.assertNotEquals("1.1.1.1",
+        resolver.resolve("testCachedResolverHost1"));
+    Assert.assertNotEquals("1.1.1.2",
+        resolver.resolve("testCachedResolverHost2"));
+  }
+
+  @Test
+  public void testDefaultResolver() throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+
+    YarnConfiguration conf = new YarnConfiguration();
+
+    MockRM rm = new MockRM(conf);
+    rm.init(conf);
+    NodesListManager nodesListManager = rm.getNodesListManager();
+
+    NodesListManager.Resolver resolver = nodesListManager.getResolver();
+    Assert.assertTrue("default resolver should be DirectResolver",
+        resolver instanceof NodesListManager.DirectResolver);
+  }
+
+  @Test
+  public void testCachedResolverWithEvent() throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS, 30);
+
+    MockRM rm = new MockRM(conf);
+    rm.init(conf);
+    NodesListManager nodesListManager = rm.getNodesListManager();
+    nodesListManager.init(conf);
+    nodesListManager.start();
+
+    NodesListManager.CachedResolver resolver =
+        (NodesListManager.CachedResolver)nodesListManager.getResolver();
+
+    resolver.addToCache("testCachedResolverHost1", "1.1.1.1");
+    resolver.addToCache("testCachedResolverHost2", "1.1.1.2");
+    Assert.assertEquals("1.1.1.1",
+        resolver.resolve("testCachedResolverHost1"));
+    Assert.assertEquals("1.1.1.2",
+        resolver.resolve("testCachedResolverHost2"));
+
+    RMNode rmnode1 = MockNodes.newNodeInfo(1, Resource.newInstance(28000, 8),
+        1, "testCachedResolverHost1", 1234);
+    RMNode rmnode2 = MockNodes.newNodeInfo(1, Resource.newInstance(28000, 8),
+        1, "testCachedResolverHost2", 1234);
+
+    nodesListManager.handle(
+        new NodesListManagerEvent(NodesListManagerEventType.NODE_USABLE,
+            rmnode1));
+    Assert.assertNotEquals("1.1.1.1",
+        resolver.resolve("testCachedResolverHost1"));
+    Assert.assertEquals("1.1.1.2",
+        resolver.resolve("testCachedResolverHost2"));
+
+    nodesListManager.handle(
+        new NodesListManagerEvent(NodesListManagerEventType.NODE_USABLE,
+            rmnode2));
+    Assert.assertNotEquals("1.1.1.1",
+        resolver.resolve("testCachedResolverHost1"));
+    Assert.assertNotEquals("1.1.1.2",
+        resolver.resolve("testCachedResolverHost2"));
+
+  }
+
   /*
    * Create dispatcher object
    */