瀏覽代碼

YARN-4686. MiniYARNCluster.start() returns before cluster is completely started. Contributed by Eric Badger.

Eric Payne 9 年之前
父節點
當前提交
f0c278469b

+ 1 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java

@@ -237,7 +237,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
   protected void verifyConnections() throws InterruptedException,
       YarnException {
     assertTrue("NMs failed to connect to the RM",
-        cluster.waitForNodeManagersToConnect(20000));
+        cluster.waitForNodeManagersToConnect(5000));
     verifyClientConnection();
   }
 
@@ -299,7 +299,6 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
     cluster.resetStartFailoverFlag(false);
     cluster.init(conf);
     cluster.start();
-    getAdminService(0).transitionToActive(req);
     assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
     verifyConnections();
 

+ 0 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java

@@ -162,7 +162,6 @@ public class TestRMFailover extends ClientBaseWithFixes {
     conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
     cluster.init(conf);
     cluster.start();
-    getAdminService(0).transitionToActive(req);
     assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
     verifyConnections();
 
@@ -251,7 +250,6 @@ public class TestRMFailover extends ClientBaseWithFixes {
     conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
     cluster.init(conf);
     cluster.start();
-    getAdminService(0).transitionToActive(req);
     assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
     verifyConnections();
 

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java

@@ -1132,6 +1132,25 @@ public class TestYarnClient {
     try {
       cluster.init(conf);
       cluster.start();
+
+      int attempts;
+      for(attempts = 10; attempts > 0; attempts--) {
+        if (cluster.getResourceManager().getRMContext().getReservationSystem()
+            .getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity()
+            .getMemory() > 0) {
+          break;
+        }
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+      if (attempts <= 0) {
+        Assert.fail("Exhausted attempts in checking if node capacity was "
+            + "added to the plan");
+      }
+
       final Configuration yarnConf = cluster.getConfig();
       client = YarnClient.createYarnClient();
       client.init(yarnConf);

+ 25 - 17
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -83,6 +83,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class);
 
   private final Object heartbeatMonitor = new Object();
+  private final Object shutdownMonitor = new Object();
 
   private final Context context;
   private final Dispatcher dispatcher;
@@ -205,27 +206,34 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
 
   @Override
   protected void serviceStop() throws Exception {
-    // Interrupt the updater.
-    this.isStopped = true;
-    stopRMProxy();
-    super.serviceStop();
+    synchronized(shutdownMonitor) {
+      // Interrupt the updater.
+      this.isStopped = true;
+      stopRMProxy();
+      super.serviceStop();
+    }
   }
 
   protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
     // Interrupt the updater.
-    this.isStopped = true;
-
-    try {
-      statusUpdater.join();
-      registerWithRM();
-      statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater");
-      this.isStopped = false;
-      statusUpdater.start();
-      LOG.info("NodeStatusUpdater thread is reRegistered and restarted");
-    } catch (Exception e) {
-      String errorMessage = "Unexpected error rebooting NodeStatusUpdater";
-      LOG.error(errorMessage, e);
-      throw new YarnRuntimeException(e);
+    synchronized(shutdownMonitor) {
+      if(this.isStopped) {
+        LOG.info("Currently being shutdown. Aborting reboot");
+        return;
+      }
+      this.isStopped = true;
+      try {
+        statusUpdater.join();
+        registerWithRM();
+        statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater");
+        statusUpdater.start();
+        this.isStopped = false;
+        LOG.info("NodeStatusUpdater thread is reRegistered and restarted");
+      } catch (Exception e) {
+        String errorMessage = "Unexpected error rebooting NodeStatusUpdater";
+        LOG.error(errorMessage, e);
+        throw new YarnRuntimeException(e);
+      }
     }
   }
 

+ 26 - 47
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -64,6 +65,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
@@ -72,6 +74,7 @@ import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
 import org.apache.hadoop.yarn.server.timeline.TimelineStore;
 import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
 import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -270,6 +273,12 @@ public class MiniYARNCluster extends CompositeService {
         conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
   }
 
+  @Override
+  protected synchronized void serviceStart() throws Exception {
+    super.serviceStart();
+    this.waitForNodeManagersToConnect(5000);
+  }
+
   private void setNonHARMConfigurationWithEphemeralPorts(Configuration conf) {
     String hostname = MiniYARNCluster.getHostname();
     conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
@@ -307,19 +316,7 @@ public class MiniYARNCluster extends CompositeService {
 
   private synchronized void startResourceManager(final int index) {
     try {
-      Thread rmThread = new Thread() {
-        public void run() {
-          resourceManagers[index].start();
-        }
-      };
-      rmThread.setName("RM-" + index);
-      rmThread.start();
-      int waitCount = 0;
-      while (resourceManagers[index].getServiceState() == STATE.INITED
-          && waitCount++ < 60) {
-        LOG.info("Waiting for RM to start...");
-        Thread.sleep(1500);
-      }
+      resourceManagers[index].start();
       if (resourceManagers[index].getServiceState() != STATE.STARTED) {
         // RM could have failed.
         throw new IOException(
@@ -448,6 +445,11 @@ public class MiniYARNCluster extends CompositeService {
     @Override
     protected synchronized void serviceStart() throws Exception {
       startResourceManager(index);
+      if(index == 0) {
+          resourceManagers[index].getRMContext().getRMAdminService()
+            .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo(
+              HAServiceProtocol.RequestSource.REQUEST_BY_USER_FORCED));
+      }
       LOG.info("MiniYARN ResourceManager address: " +
                getConfig().get(YarnConfiguration.RM_ADDRESS));
       LOG.info("MiniYARN ResourceManager web address: " +
@@ -555,26 +557,12 @@ public class MiniYARNCluster extends CompositeService {
     }
 
     protected synchronized void serviceStart() throws Exception {
-      try {
-        new Thread() {
-          public void run() {
-            nodeManagers[index].start();
-          }
-        }.start();
-        int waitCount = 0;
-        while (nodeManagers[index].getServiceState() == STATE.INITED
-            && waitCount++ < 60) {
-          LOG.info("Waiting for NM " + index + " to start...");
-          Thread.sleep(1000);
-        }
-        if (nodeManagers[index].getServiceState() != STATE.STARTED) {
-          // RM could have failed.
-          throw new IOException("NodeManager " + index + " failed to start");
-        }
-        super.serviceStart();
-      } catch (Throwable t) {
-        throw new YarnRuntimeException(t);
+      nodeManagers[index].start();
+      if (nodeManagers[index].getServiceState() != STATE.STARTED) {
+        // NM could have failed.
+        throw new IOException("NodeManager " + index + " failed to start");
       }
+      super.serviceStart();
     }
 
     @Override
@@ -650,7 +638,7 @@ public class MiniYARNCluster extends CompositeService {
   /**
    * Wait for all the NodeManagers to connect to the ResourceManager.
    *
-   * @param timeout Time to wait (sleeps in 100 ms intervals) in milliseconds.
+   * @param timeout Time to wait (sleeps in 10 ms intervals) in milliseconds.
    * @return true if all NodeManagers connect to the (Active)
    * ResourceManager, false otherwise.
    * @throws YarnException
@@ -659,16 +647,17 @@ public class MiniYARNCluster extends CompositeService {
   public boolean waitForNodeManagersToConnect(long timeout)
       throws YarnException, InterruptedException {
     GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance();
-    for (int i = 0; i < timeout / 100; i++) {
+    for (int i = 0; i < timeout / 10; i++) {
       ResourceManager rm = getResourceManager();
       if (rm == null) {
         throw new YarnException("Can not find the active RM.");
       }
       else if (nodeManagers.length == rm.getClientRMService()
-            .getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) {
+          .getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) {
+        LOG.info("All Node Managers connected in MiniYARNCluster");
         return true;
       }
-      Thread.sleep(100);
+      Thread.sleep(10);
     }
     return false;
   }
@@ -701,17 +690,7 @@ public class MiniYARNCluster extends CompositeService {
     @Override
     protected synchronized void serviceStart() throws Exception {
       try {
-        new Thread() {
-          public void run() {
-            appHistoryServer.start();
-          };
-        }.start();
-        int waitCount = 0;
-        while (appHistoryServer.getServiceState() == STATE.INITED
-            && waitCount++ < 60) {
-          LOG.info("Waiting for Timeline Server to start...");
-          Thread.sleep(1500);
-        }
+        appHistoryServer.start();
         if (appHistoryServer.getServiceState() != STATE.STARTED) {
           // AHS could have failed.
           throw new IOException(

+ 0 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java

@@ -44,10 +44,6 @@ public class TestMiniYARNClusterForHA {
     cluster.init(conf);
     cluster.start();
 
-    cluster.getResourceManager(0).getRMContext().getRMAdminService()
-        .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo(
-            HAServiceProtocol.RequestSource.REQUEST_BY_USER));
-
     assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
   }