1
0
Просмотр исходного кода

YARN-513. Create common proxy client for communicating with RM (Xuan Gong & Jian He via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1503933 13f79535-47bb-0310-9956-ffa450edef68
Bikas Saha 12 лет назад
Родитель
Сommit
33b5a81509
16 измененных файлов с 433 добавлено и 266 удалено
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 6 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 65 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
  4. 0 20
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
  5. 3 23
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
  6. 9 20
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
  7. 2 26
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
  8. 125 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
  9. 55 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
  10. 10 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java
  11. 21 117
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  12. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
  13. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
  14. 115 52
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
  15. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
  16. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -482,6 +482,9 @@ Release 2.1.0-beta - 2013-07-02
     YARN-521. Augment AM - RM client module to be able to request containers
     only at specific locations (Sandy Ryza via bikas)
 
+    YARN-513. Create common proxy client for communicating with RM. (Xuan Gong
+    & Jian He via bikas)
+
   OPTIMIZATIONS
 
     YARN-512. Log aggregation root directory check is more expensive than it

+ 6 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -655,17 +655,17 @@ public class YarnConfiguration extends Configuration {
   public static final long DEFAULT_NM_PROCESS_KILL_WAIT_MS =
       2000;
 
-  /** Max time to wait to establish a connection to RM when NM starts
+  /** Max time to wait to establish a connection to RM
    */
-  public static final String RESOURCEMANAGER_CONNECT_WAIT_SECS =
-      NM_PREFIX + "resourcemanager.connect.wait.secs";
-  public static final int DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS =
+  public static final String RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS =
+      RM_PREFIX + "resourcemanager.connect.max.wait.secs";
+  public static final int DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS =
       15*60;
 
-  /** Time interval between each NM attempt to connect to RM
+  /** Time interval between each attempt to connect to RM
    */
   public static final String RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS =
-      NM_PREFIX + "resourcemanager.connect.retry_interval.secs";
+      RM_PREFIX + "resourcemanager.connect.retry_interval.secs";
   public static final long DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS
       = 30;
 

+ 65 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java

@@ -0,0 +1,65 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+
+public class ClientRMProxy<T> extends RMProxy<T>{
+
+  private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
+
+  public static <T> T createRMProxy(final Configuration conf,
+      final Class<T> protocol) throws IOException {
+    InetSocketAddress rmAddress = getRMAddress(conf, protocol);
+    return createRMProxy(conf, protocol, rmAddress);
+  }
+
+  private static InetSocketAddress getRMAddress(Configuration conf, Class<?> protocol) {
+    if (protocol == ApplicationClientProtocol.class) {
+      return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_PORT);
+    } else if (protocol == ResourceManagerAdministrationProtocol.class) {
+      return conf.getSocketAddr(
+          YarnConfiguration.RM_ADMIN_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
+    } else if (protocol == ApplicationMasterProtocol.class) {
+      return conf.getSocketAddr(
+          YarnConfiguration.RM_SCHEDULER_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+    } else {
+      String message = "Unsupported protocol found when creating the proxy " +
+          "connection to ResourceManager: " +
+          ((protocol != null) ? protocol.getClass().getName() : "null");
+      LOG.error(message);
+      throw new IllegalStateException(message);
+    }
+  }
+}

+ 0 - 20
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java

@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.client.api;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Set;
 
@@ -54,25 +53,6 @@ public abstract class YarnClient extends AbstractService {
     return client;
   }
 
-  /**
-   * Create a new instance of YarnClient.
-   */
-  @Public
-  public static YarnClient createYarnClient(InetSocketAddress rmAddress) {
-    YarnClient client = new YarnClientImpl(rmAddress);
-    return client;
-  }
-
-  /**
-   * Create a new instance of YarnClient.
-   */
-  @Public
-  public static YarnClient createYarnClient(String name,
-      InetSocketAddress rmAddress) {
-    YarnClient client = new YarnClientImpl(name, rmAddress);
-    return client;
-  }
-
   @Private
   protected YarnClient(String name) {
     super(name);

+ 3 - 23
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java

@@ -19,8 +19,6 @@
 package org.apache.hadoop.yarn.client.api.impl;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -42,7 +40,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -56,16 +53,16 @@ import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
 import org.apache.hadoop.yarn.client.api.NMTokenCache;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.util.RackResolver;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -171,28 +168,11 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   @Override
   protected void serviceStart() throws Exception {
     final YarnConfiguration conf = new YarnConfiguration(getConfig());
-    final YarnRPC rpc = YarnRPC.create(conf);
-    final InetSocketAddress rmAddress = conf.getSocketAddr(
-        YarnConfiguration.RM_SCHEDULER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
-
-    UserGroupInformation currentUser;
     try {
-      currentUser = UserGroupInformation.getCurrentUser();
+      rmClient = ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
     } catch (IOException e) {
       throw new YarnRuntimeException(e);
     }
-
-    // CurrentUser should already have AMToken loaded.
-    rmClient = currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
-      @Override
-      public ApplicationMasterProtocol run() {
-        return (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol.class, rmAddress,
-            conf);
-      }
-    });
-    LOG.debug("Connecting to ResourceManager at " + rmAddress);
     super.serviceStart();
   }
 

+ 9 - 20
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java

@@ -59,11 +59,12 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.util.Records;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -81,16 +82,7 @@ public class YarnClientImpl extends YarnClient {
   private static final String ROOT = "root";
 
   public YarnClientImpl() {
-    this(null);
-  }
-  
-  public YarnClientImpl(InetSocketAddress rmAddress) {
-    this(YarnClientImpl.class.getName(), rmAddress);
-  }
-
-  public YarnClientImpl(String name, InetSocketAddress rmAddress) {
-    super(name);
-    this.rmAddress = rmAddress;
+    super(YarnClientImpl.class.getName());
   }
 
   private static InetSocketAddress getRmAddress(Configuration conf) {
@@ -100,9 +92,7 @@ public class YarnClientImpl extends YarnClient {
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    if (this.rmAddress == null) {
-      this.rmAddress = getRmAddress(conf);
-    }
+    this.rmAddress = getRmAddress(conf);
     statePollIntervalMillis = conf.getLong(
         YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
         YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS);
@@ -111,12 +101,11 @@ public class YarnClientImpl extends YarnClient {
 
   @Override
   protected void serviceStart() throws Exception {
-    YarnRPC rpc = YarnRPC.create(getConfig());
-
-    this.rmClient = (ApplicationClientProtocol) rpc.getProxy(
-        ApplicationClientProtocol.class, rmAddress, getConfig());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Connecting to ResourceManager at " + rmAddress);
+    try {
+      rmClient = ClientRMProxy.createRMProxy(getConfig(),
+            ApplicationClientProtocol.class);
+    } catch (IOException e) {
+      throw new YarnRuntimeException(e);
     }
     super.serviceStart();
   }

+ 2 - 26
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java

@@ -19,8 +19,6 @@
 package org.apache.hadoop.yarn.client.cli;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedAction;
 import java.util.Arrays;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -31,11 +29,11 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@@ -164,32 +162,10 @@ public class RMAdminCLI extends Configured implements Tool {
     }
   }
 
-  private static UserGroupInformation getUGI(Configuration conf
-  ) throws IOException {
-    return UserGroupInformation.getCurrentUser();
-  }
-
   private ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException {
     // Get the current configuration
     final YarnConfiguration conf = new YarnConfiguration(getConf());
-
-    // Create the client
-    final InetSocketAddress addr = conf.getSocketAddr(
-        YarnConfiguration.RM_ADMIN_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
-    final YarnRPC rpc = YarnRPC.create(conf);
-    
-    ResourceManagerAdministrationProtocol adminProtocol =
-      getUGI(conf).doAs(new PrivilegedAction<ResourceManagerAdministrationProtocol>() {
-        @Override
-        public ResourceManagerAdministrationProtocol run() {
-          return (ResourceManagerAdministrationProtocol) rpc.getProxy(ResourceManagerAdministrationProtocol.class,
-              addr, conf);
-        }
-      });
-
-    return adminProtocol;
+    return ClientRMProxy.createRMProxy(conf, ResourceManagerAdministrationProtocol.class);
   }
   
   private int refreshQueues() throws IOException, YarnException {

+ 125 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java

@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class RMProxy<T> {
+
+  private static final Log LOG = LogFactory.getLog(RMProxy.class);
+
+  @SuppressWarnings("unchecked")
+  public static <T> T createRMProxy(final Configuration conf,
+      final Class<T> protocol, InetSocketAddress rmAddress) throws IOException {
+    RetryPolicy retryPolicy = createRetryPolicy(conf);
+    T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
+    LOG.info("Connecting to ResourceManager at " + rmAddress);
+    return (T) RetryProxy.create(protocol, proxy, retryPolicy);
+  }
+
+  @SuppressWarnings("unchecked")
+  protected static <T> T getProxy(final Configuration conf,
+      final Class<T> protocol, final InetSocketAddress rmAddress)
+      throws IOException {
+    return (T) UserGroupInformation.getCurrentUser().doAs(
+      new PrivilegedAction<Object>() {
+
+        @Override
+        public T run() {
+          return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);
+        }
+      });
+  }
+
+  public static RetryPolicy createRetryPolicy(Configuration conf) {
+    long rmConnectWaitMS =
+        conf.getInt(
+            YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS,
+            YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS)
+        * 1000;
+    long rmConnectionRetryIntervalMS =
+        conf.getLong(
+            YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
+            YarnConfiguration
+            .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS)
+        * 1000;
+
+    if (rmConnectionRetryIntervalMS < 0) {
+      throw new YarnRuntimeException("Invalid Configuration. " +
+          YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS +
+          " should not be negative.");
+    }
+
+    boolean waitForEver = (rmConnectWaitMS == -1000);
+
+    if (waitForEver) {
+      return  RetryPolicies.RETRY_FOREVER;
+    } else {
+      if (rmConnectWaitMS < 0) {
+        throw new YarnRuntimeException("Invalid Configuration. "
+            + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS
+            + " can be -1, but can not be other negative numbers");
+      }
+
+      // try connect once
+      if (rmConnectWaitMS < rmConnectionRetryIntervalMS) {
+        LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS
+            + " is smaller than "
+            + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS
+            + ". Only try connect once.");
+        rmConnectWaitMS = 0;
+      }
+    }
+
+    RetryPolicy retryPolicy =
+        RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS,
+            rmConnectionRetryIntervalMS,
+            TimeUnit.MILLISECONDS);
+
+    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
+        new HashMap<Class<? extends Exception>, RetryPolicy>();
+    exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
+    //TO DO: after HADOOP-9576,  IOException can be changed to EOFException
+    exceptionToPolicyMap.put(IOException.class, retryPolicy);
+
+    return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,
+      exceptionToPolicyMap);
+  }
+}

+ 55 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java

@@ -0,0 +1,55 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.api;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.client.RMProxy;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+public class ServerRMProxy<T> extends RMProxy<T>{
+
+  private static final Log LOG = LogFactory.getLog(ServerRMProxy.class);
+
+  public static <T> T createRMProxy(final Configuration conf,
+      final Class<T> protocol) throws IOException {
+    InetSocketAddress rmAddress = getRMAddress(conf, protocol);
+    return createRMProxy(conf, protocol, rmAddress);
+  }
+
+  private static InetSocketAddress getRMAddress(Configuration conf, Class<?> protocol) {
+    if (protocol == ResourceTracker.class) {
+      return conf.getSocketAddr(
+        YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
+    }
+    else {
+      String message = "Unsupported protocol found when creating the proxy " +
+          "connection to ResourceManager: " +
+          ((protocol != null) ? protocol.getClass().getName() : "null");
+      LOG.error(message);
+      throw new IllegalStateException(message);
+    }
+  }
+}

+ 10 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.api.impl.pb.client;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
@@ -41,7 +42,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeMan
 
 import com.google.protobuf.ServiceException;
 
-public class ResourceTrackerPBClientImpl implements ResourceTracker {
+public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {
 
 private ResourceTrackerPB proxy;
   
@@ -50,7 +51,14 @@ private ResourceTrackerPB proxy;
     proxy = (ResourceTrackerPB)RPC.getProxy(
         ResourceTrackerPB.class, clientVersion, addr, conf);
   }
-  
+
+  @Override
+  public void close() {
+    if(this.proxy != null) {
+      RPC.stopProxy(this.proxy);
+    }
+  }
+
   @Override
   public RegisterNodeManagerResponse registerNodeManager(
       RegisterNodeManagerRequest request) throws YarnException,

+ 21 - 117
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -19,7 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -33,6 +33,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -47,9 +48,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.ServerRMProxy;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -77,7 +78,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   private NodeId nodeId;
   private long nextHeartBeatInterval;
   private ResourceTracker resourceTracker;
-  private InetSocketAddress rmAddress;
   private Resource totalResource;
   private int httpPort;
   private volatile boolean isStopped;
@@ -91,9 +91,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
 
   private final NodeHealthCheckerService healthChecker;
   private final NodeManagerMetrics metrics;
-  private long rmConnectWaitMS;
-  private long rmConnectionRetryIntervalMS;
-  private boolean waitForEver;
 
   private Runnable statusUpdaterRunnable;
   private Thread  statusUpdater;
@@ -110,11 +107,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    this.rmAddress = conf.getSocketAddr(
-        YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
-
     int memoryMb = 
         conf.getInt(
             YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
@@ -153,6 +145,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     try {
       // Registration has to be in start so that ContainerManager can get the
       // perNM tokens needed to authenticate ContainerTokens.
+      this.resourceTracker = getRMClient();
       registerWithRM();
       super.serviceStart();
       startStatusUpdater();
@@ -167,6 +160,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   protected void serviceStop() throws Exception {
     // Interrupt the updater.
     this.isStopped = true;
+    stopRMProxy();
     super.serviceStop();
   }
 
@@ -188,6 +182,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     }
   }
 
+  @VisibleForTesting
+  protected void stopRMProxy() {
+    if(this.resourceTracker != null) {
+      RPC.stopProxy(this.resourceTracker);
+    }
+  }
+
   @Private
   protected boolean isTokenKeepAliveEnabled(Configuration conf) {
     return conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
@@ -195,93 +196,22 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         && UserGroupInformation.isSecurityEnabled();
   }
 
-  protected ResourceTracker getRMClient() {
+  @VisibleForTesting
+  protected ResourceTracker getRMClient() throws IOException {
     Configuration conf = getConfig();
-    YarnRPC rpc = YarnRPC.create(conf);
-    return (ResourceTracker) rpc.getProxy(ResourceTracker.class, rmAddress,
-        conf);
+    return ServerRMProxy.createRMProxy(conf, ResourceTracker.class);
   }
 
   @VisibleForTesting
   protected void registerWithRM() throws YarnException, IOException {
-    Configuration conf = getConfig();
-    rmConnectWaitMS =
-        conf.getInt(
-            YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
-            YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS)
-        * 1000;
-    rmConnectionRetryIntervalMS =
-        conf.getLong(
-            YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
-            YarnConfiguration
-                .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS)
-        * 1000;
-
-    if(rmConnectionRetryIntervalMS < 0) {
-      throw new YarnRuntimeException("Invalid Configuration. " +
-          YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS +
-          " should not be negative.");
-    }
-
-    waitForEver = (rmConnectWaitMS == -1000);
-
-    if(! waitForEver) {
-      if(rmConnectWaitMS < 0) {
-          throw new YarnRuntimeException("Invalid Configuration. " +
-              YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS +
-              " can be -1, but can not be other negative numbers");
-      }
-
-      //try connect once
-      if(rmConnectWaitMS < rmConnectionRetryIntervalMS) {
-        LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS
-            + " is smaller than "
-            + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS
-            + ". Only try connect once.");
-        rmConnectWaitMS = 0;
-      }
-    }
-
-    int rmRetryCount = 0;
-    long waitStartTime = System.currentTimeMillis();
-
     RegisterNodeManagerRequest request =
         recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
     request.setHttpPort(this.httpPort);
     request.setResource(this.totalResource);
     request.setNodeId(this.nodeId);
-    RegisterNodeManagerResponse regNMResponse;
-
-    while(true) {
-      try {
-        rmRetryCount++;
-        LOG.info("Connecting to ResourceManager at " + this.rmAddress
-            + ". current no. of attempts is " + rmRetryCount);
-        this.resourceTracker = getRMClient();
-        regNMResponse =
-            this.resourceTracker.registerNodeManager(request);
-        this.rmIdentifier = regNMResponse.getRMIdentifier();
-        break;
-      } catch(Throwable e) {
-        LOG.warn("Trying to connect to ResourceManager, " +
-            "current no. of failed attempts is "+rmRetryCount);
-        if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS
-            || waitForEver) {
-          try {
-            LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000
-                + " seconds before next connection retry to RM");
-            Thread.sleep(rmConnectionRetryIntervalMS);
-          } catch(InterruptedException ex) {
-            //done nothing
-          }
-        } else {
-          String errorMessage = "Failed to Connect to RM, " +
-              "no. of failed attempts is "+rmRetryCount;
-          LOG.error(errorMessage,e);
-          throw new YarnRuntimeException(errorMessage,e);
-        }
-      }
-    }
+    RegisterNodeManagerResponse regNMResponse =
+        resourceTracker.registerNodeManager(request);
+    this.rmIdentifier = regNMResponse.getRMIdentifier();
     // if the Resourcemanager instructs NM to shutdown.
     if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
       String message =
@@ -426,8 +356,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
           // Send heartbeat
           try {
             NodeHeartbeatResponse response = null;
-            int rmRetryCount = 0;
-            long waitStartTime = System.currentTimeMillis();
             NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext();
             nodeStatus.setResponseId(lastHeartBeatID);
             
@@ -440,31 +368,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
             request
               .setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context
                 .getNMTokenSecretManager().getCurrentKey());
-            while (!isStopped) {
-              try {
-                rmRetryCount++;
-                response = resourceTracker.nodeHeartbeat(request);
-                break;
-              } catch (Throwable e) {
-                LOG.warn("Trying to heartbeat to ResourceManager, "
-                    + "current no. of failed attempts is " + rmRetryCount);
-                if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS
-                    || waitForEver) {
-                  try {
-                    LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000
-                        + " seconds before next heartbeat to RM");
-                    Thread.sleep(rmConnectionRetryIntervalMS);
-                  } catch(InterruptedException ex) {
-                    //done nothing
-                  }
-                } else {
-                  String errorMessage = "Failed to heartbeat to RM, " +
-                      "no. of failed attempts is "+rmRetryCount;
-                  LOG.error(errorMessage,e);
-                  throw new YarnRuntimeException(errorMessage,e);
-                }
-              }
-            }
+            response = resourceTracker.nodeHeartbeat(request);
             //get next heartbeat interval from response
             nextHeartBeatInterval = response.getNextHeartBeatInterval();
             updateMasterKeys(response);
@@ -508,11 +412,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
               dispatcher.getEventHandler().handle(
                   new CMgrCompletedAppsEvent(appsToCleanup));
             }
-          } catch (YarnRuntimeException e) {
+          } catch (ConnectException e) {
             //catch and throw the exception if tried MAX wait time to connect RM
             dispatcher.getEventHandler().handle(
                 new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
-            throw e;
+            throw new YarnRuntimeException(e);
           } catch (Throwable e) {
             // TODO Better error handling. Thread can die with the rest of the
             // NM still running.

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java

@@ -61,6 +61,10 @@ public class MockNodeStatusUpdater extends NodeStatusUpdaterImpl {
   protected ResourceTracker getRMClient() {
     return resourceTracker;
   }
+  @Override
+  protected void stopRMProxy() {
+    return;
+  }
   
   private static class MockResourceTracker implements ResourceTracker {
     private int heartBeatID;

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java

@@ -107,6 +107,11 @@ public class TestEventFlow {
         return new LocalRMInterface();
       };
 
+          @Override
+          protected void stopRMProxy() {
+            return;
+          }
+
       @Override
       protected void startStatusUpdater() {
         return; // Don't start any updating thread.

+ 115 - 52
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

@@ -41,6 +41,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.service.ServiceOperations;
@@ -53,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.RMProxy;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -60,9 +63,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.ServerRMProxy;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -103,11 +106,17 @@ public class TestNodeStatusUpdater {
   volatile int heartBeatID = 0;
   volatile Throwable nmStartError = null;
   private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
-  private final Configuration conf = createNMConfig();
+  private boolean triggered = false;
+  private Configuration conf;
   private NodeManager nm;
   private boolean containerStatusBackupSuccessfully = true;
   private List<ContainerStatus> completedContainerStatusList = new ArrayList<ContainerStatus>();
 
+  @Before
+  public void setUp() {
+    conf = createNMConfig();
+  }
+
   @After
   public void tearDown() {
     this.registeredNodes.clear();
@@ -274,6 +283,11 @@ public class TestNodeStatusUpdater {
     protected ResourceTracker getRMClient() {
       return resourceTracker;
     }
+
+    @Override
+    protected void stopRMProxy() {
+      return;
+    }
   }
 
   private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl {
@@ -290,6 +304,10 @@ public class TestNodeStatusUpdater {
       return resourceTracker;
     }
 
+    @Override
+    protected void stopRMProxy() {
+      return;
+    }
   }
 
   private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl {
@@ -307,7 +325,12 @@ public class TestNodeStatusUpdater {
     protected ResourceTracker getRMClient() {
       return resourceTracker;
     }
-    
+
+    @Override
+    protected void stopRMProxy() {
+      return;
+    }
+
     @Override
     protected boolean isTokenKeepAliveEnabled(Configuration conf) {
       return true;
@@ -315,21 +338,16 @@ public class TestNodeStatusUpdater {
   }
 
   private class MyNodeStatusUpdater4 extends NodeStatusUpdaterImpl {
-    public ResourceTracker resourceTracker =
-        new MyResourceTracker(this.context);
+
     private Context context;
-    private long waitStartTime;
     private final long rmStartIntervalMS;
     private final boolean rmNeverStart;
-    private volatile boolean triggered = false;
-    private long durationWhenTriggered = -1;
-
+    public ResourceTracker resourceTracker;
     public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher,
         NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
         long rmStartIntervalMS, boolean rmNeverStart) {
       super(context, dispatcher, healthChecker, metrics);
       this.context = context;
-      this.waitStartTime = System.currentTimeMillis();
       this.rmStartIntervalMS = rmStartIntervalMS;
       this.rmNeverStart = rmNeverStart;
     }
@@ -337,25 +355,16 @@ public class TestNodeStatusUpdater {
     @Override
     protected void serviceStart() throws Exception {
       //record the startup time
-      this.waitStartTime = System.currentTimeMillis();
       super.serviceStart();
     }
 
     @Override
-    protected ResourceTracker getRMClient() {
-      if (!triggered) {
-        long t = System.currentTimeMillis();
-        long duration = t - waitStartTime;
-        if (duration <= rmStartIntervalMS
-            || rmNeverStart) {
-          throw new YarnRuntimeException("Faking RM start failure as start " +
-                                  "delay timer has not expired.");
-        } else {
-          //triggering
-          triggered = true;
-          durationWhenTriggered = duration;
-        }
-      }
+    protected ResourceTracker getRMClient() throws IOException {
+      RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf);
+      resourceTracker =
+          (ResourceTracker) RetryProxy.create(ResourceTracker.class,
+            new MyResourceTracker6(this.context, rmStartIntervalMS,
+              rmNeverStart), retryPolicy);
       return resourceTracker;
     }
 
@@ -363,37 +372,35 @@ public class TestNodeStatusUpdater {
       return triggered;
     }
 
-    private long getWaitStartTime() {
-      return waitStartTime;
-    }
-
-    private long getDurationWhenTriggered() {
-      return durationWhenTriggered;
-    }
-
     @Override
-    public String toString() {
-      return "MyNodeStatusUpdater4{" +
-             "rmNeverStart=" + rmNeverStart +
-             ", triggered=" + triggered +
-             ", duration=" + durationWhenTriggered +
-             ", rmStartIntervalMS=" + rmStartIntervalMS +
-             '}';
+    protected void stopRMProxy() {
+      return;
     }
   }
 
+
+
   private class MyNodeStatusUpdater5 extends NodeStatusUpdaterImpl {
     private ResourceTracker resourceTracker;
+    private Configuration conf;
 
     public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher,
-        NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+        NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, Configuration conf) {
       super(context, dispatcher, healthChecker, metrics);
       resourceTracker = new MyResourceTracker5();
+      this.conf = conf;
     }
 
     @Override
     protected ResourceTracker getRMClient() {
-      return resourceTracker;
+      RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf);
+      return (ResourceTracker) RetryProxy.create(ResourceTracker.class,
+        resourceTracker, retryPolicy);
+    }
+
+    @Override
+    protected void stopRMProxy() {
+      return;
     }
   }
 
@@ -417,15 +424,18 @@ public class TestNodeStatusUpdater {
     public boolean isStopped = false;
     private NodeStatusUpdater nodeStatusUpdater;
     private CyclicBarrier syncBarrier;
-    public MyNodeManager2 (CyclicBarrier syncBarrier) {
+    private Configuration conf;
+
+    public MyNodeManager2 (CyclicBarrier syncBarrier, Configuration conf) {
       this.syncBarrier = syncBarrier;
+      this.conf = conf;
     }
     @Override
     protected NodeStatusUpdater createNodeStatusUpdater(Context context,
         Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
       nodeStatusUpdater =
           new MyNodeStatusUpdater5(context, dispatcher, healthChecker,
-                                     metrics);
+                                     metrics, conf);
       return nodeStatusUpdater;
     }
 
@@ -577,7 +587,7 @@ public class TestNodeStatusUpdater {
               .get(4).getState() == ContainerState.RUNNING
               && request.getNodeStatus().getContainersStatuses().get(4)
                   .getContainerId().getId() == 5);
-          throw new YarnRuntimeException("Lost the heartbeat response");
+          throw new java.net.ConnectException("Lost the heartbeat response");
         } else if (heartBeatID == 2) {
           Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
               .size(), 7);
@@ -646,7 +656,63 @@ public class TestNodeStatusUpdater {
     public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
         throws YarnException, IOException {
       heartBeatID++;
-      throw RPCUtil.getRemoteException("NodeHeartbeat exception");
+      throw new java.net.ConnectException(
+          "NodeHeartbeat exception");
+    }
+  }
+
+  private class MyResourceTracker6 implements ResourceTracker {
+
+    private final Context context;
+    private long rmStartIntervalMS;
+    private boolean rmNeverStart;
+    private final long waitStartTime;
+
+    public MyResourceTracker6(Context context, long rmStartIntervalMS,
+        boolean rmNeverStart) {
+      this.context = context;
+      this.rmStartIntervalMS = rmStartIntervalMS;
+      this.rmNeverStart = rmNeverStart;
+      this.waitStartTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public RegisterNodeManagerResponse registerNodeManager(
+        RegisterNodeManagerRequest request) throws YarnException, IOException,
+        IOException {
+      if (System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS
+          || rmNeverStart) {
+        throw new java.net.ConnectException("Faking RM start failure as start "
+            + "delay timer has not expired.");
+      } else {
+        NodeId nodeId = request.getNodeId();
+        Resource resource = request.getResource();
+        LOG.info("Registering " + nodeId.toString());
+        // NOTE: this really should be checking against the config value
+        InetSocketAddress expected = NetUtils.getConnectAddress(
+            conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1));
+        Assert.assertEquals(NetUtils.getHostPortString(expected),
+            nodeId.toString());
+        Assert.assertEquals(5 * 1024, resource.getMemory());
+        registeredNodes.add(nodeId);
+
+        RegisterNodeManagerResponse response = recordFactory
+            .newRecordInstance(RegisterNodeManagerResponse.class);
+        triggered = true;
+        return response;
+      }
+    }
+
+    @Override
+    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+        throws YarnException, IOException {
+      NodeStatus nodeStatus = request.getNodeStatus();
+      nodeStatus.setResponseId(heartBeatID++);
+
+      NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
+          newNodeHeartbeatResponse(heartBeatID, NodeAction.NORMAL, null,
+              null, null, null, 1000L);
+      return nhResponse;
     }
   }
 
@@ -843,8 +909,7 @@ public class TestNodeStatusUpdater {
     final long connectionRetryIntervalSecs = 1;
     //Waiting for rmStartIntervalMS, RM will be started
     final long rmStartIntervalMS = 2*1000;
-    YarnConfiguration conf = createNMConfig();
-    conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
+    conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS,
         connectionWaitSecs);
     conf.setLong(YarnConfiguration
         .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
@@ -907,8 +972,6 @@ public class TestNodeStatusUpdater {
     }
     long duration = System.currentTimeMillis() - waitStartTime;
     MyNodeStatusUpdater4 myUpdater = (MyNodeStatusUpdater4) updater;
-    Assert.assertTrue("Updater was never started",
-                      myUpdater.getWaitStartTime()>0);
     Assert.assertTrue("NM started before updater triggered",
                       myUpdater.isTriggered());
     Assert.assertTrue("NM should have connected to RM after "
@@ -1037,13 +1100,13 @@ public class TestNodeStatusUpdater {
     final long connectionWaitSecs = 1;
     final long connectionRetryIntervalSecs = 1;
     YarnConfiguration conf = createNMConfig();
-    conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
+    conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS,
         connectionWaitSecs);
     conf.setLong(YarnConfiguration
         .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
         connectionRetryIntervalSecs);
     CyclicBarrier syncBarrier = new CyclicBarrier(2);
-    nm = new MyNodeManager2(syncBarrier);
+    nm = new MyNodeManager2(syncBarrier, conf);
     nm.init(conf);
     nm.start();
     try {

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java

@@ -117,6 +117,11 @@ public abstract class BaseContainerManagerTest {
       return new LocalRMInterface();
     };
 
+    @Override
+    protected void stopRMProxy() {
+      return;
+    }
+
     @Override
     protected void startStatusUpdater() {
       return; // Don't start any updating thread.

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

@@ -390,6 +390,11 @@ public class MiniYARNCluster extends CompositeService {
             }
           };
         };
+
+        @Override
+        protected void stopRMProxy() {
+          return;
+        }
       };
     };
   }