Browse Source

YARN-2171. Improved CapacityScheduling to not lock on nodemanager-count when AMs heartbeat in. Contributed by Jason Lowe.
svn merge --ignore-ancestry -c 1605616 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1605617 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 11 years ago
parent
commit
8317fd5de6

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -177,6 +177,9 @@ Release 2.5.0 - UNRELEASED
     YARN-2152. Added missing information into ContainerTokenIdentifier so that
     YARN-2152. Added missing information into ContainerTokenIdentifier so that
     NodeManagers can report the same to RM when RM restarts. (Jian He via vinodkv)
     NodeManagers can report the same to RM when RM restarts. (Jian He via vinodkv)
 
 
+    YARN-2171. Improved CapacityScheduling to not lock on nodemanager-count when
+    AMs heartbeat in. (Jason Lowe via vinodkv)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
   BUG FIXES 
   BUG FIXES 

+ 9 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
+
 import java.io.IOException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.ArrayList;
@@ -30,6 +31,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
@@ -180,7 +182,7 @@ public class CapacityScheduler extends
 
 
   private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
   private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
 
 
-  private int numNodeManagers = 0;
+  private AtomicInteger numNodeManagers = new AtomicInteger(0);
 
 
   private ResourceCalculator calculator;
   private ResourceCalculator calculator;
   private boolean usePortForNodeName;
   private boolean usePortForNodeName;
@@ -236,8 +238,8 @@ public class CapacityScheduler extends
   }
   }
 
 
   @Override
   @Override
-  public synchronized int getNumClusterNodes() {
-    return numNodeManagers;
+  public int getNumClusterNodes() {
+    return numNodeManagers.get();
   }
   }
 
 
   @Override
   @Override
@@ -953,11 +955,11 @@ public class CapacityScheduler extends
         usePortForNodeName));
         usePortForNodeName));
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
     root.updateClusterResource(clusterResource);
     root.updateClusterResource(clusterResource);
-    ++numNodeManagers;
+    int numNodes = numNodeManagers.incrementAndGet();
     LOG.info("Added node " + nodeManager.getNodeAddress() + 
     LOG.info("Added node " + nodeManager.getNodeAddress() + 
         " clusterResource: " + clusterResource);
         " clusterResource: " + clusterResource);
 
 
-    if (scheduleAsynchronously && numNodeManagers == 1) {
+    if (scheduleAsynchronously && numNodes == 1) {
       asyncSchedulerThread.beginSchedule();
       asyncSchedulerThread.beginSchedule();
     }
     }
   }
   }
@@ -969,9 +971,9 @@ public class CapacityScheduler extends
     }
     }
     Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
     Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
     root.updateClusterResource(clusterResource);
     root.updateClusterResource(clusterResource);
-    --numNodeManagers;
+    int numNodes = numNodeManagers.decrementAndGet();
 
 
-    if (scheduleAsynchronously && numNodeManagers == 0) {
+    if (scheduleAsynchronously && numNodes == 0) {
       asyncSchedulerThread.suspendSchedule();
       asyncSchedulerThread.suspendSchedule();
     }
     }
     
     

+ 142 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -25,15 +25,29 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.when;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.LocalConfigurationProvider;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -46,13 +60,20 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.resourcemanager.Application;
 import org.apache.hadoop.yarn.server.resourcemanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.Task;
 import org.apache.hadoop.yarn.server.resourcemanager.Task;
+import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
+import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -686,4 +707,125 @@ public class TestCapacityScheduler {
     }
     }
   }
   }
 
 
+  @Test(timeout = 30000)
+  public void testAllocateDoesNotBlockOnSchedulerLock() throws Exception {
+    final YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MyContainerManager containerManager = new MyContainerManager();
+    final MockRMWithAMS rm =
+        new MockRMWithAMS(conf, containerManager);
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("localhost:1234", 5120);
+
+    Map<ApplicationAccessType, String> acls =
+        new HashMap<ApplicationAccessType, String>(2);
+    acls.put(ApplicationAccessType.VIEW_APP, "*");
+    RMApp app = rm.submitApp(1024, "appname", "appuser", acls);
+
+    nm1.nodeHeartbeat(true);
+
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
+    int msecToWait = 10000;
+    int msecToSleep = 100;
+    while (attempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED
+        && msecToWait > 0) {
+      LOG.info("Waiting for AppAttempt to reach LAUNCHED state. "
+          + "Current state is " + attempt.getAppAttemptState());
+      Thread.sleep(msecToSleep);
+      msecToWait -= msecToSleep;
+    }
+    Assert.assertEquals(attempt.getAppAttemptState(),
+        RMAppAttemptState.LAUNCHED);
+
+    // Create a client to the RM.
+    final YarnRPC rpc = YarnRPC.create(conf);
+
+    UserGroupInformation currentUser =
+        UserGroupInformation.createRemoteUser(applicationAttemptId.toString());
+    Credentials credentials = containerManager.getContainerCredentials();
+    final InetSocketAddress rmBindAddress =
+        rm.getApplicationMasterService().getBindAddress();
+    Token<? extends TokenIdentifier> amRMToken =
+        MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress,
+          credentials.getAllTokens());
+    currentUser.addToken(amRMToken);
+    ApplicationMasterProtocol client =
+        currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
+          @Override
+          public ApplicationMasterProtocol run() {
+            return (ApplicationMasterProtocol) rpc.getProxy(
+              ApplicationMasterProtocol.class, rmBindAddress, conf);
+          }
+        });
+
+    RegisterApplicationMasterRequest request =
+        RegisterApplicationMasterRequest.newInstance("localhost", 12345, "");
+    client.registerApplicationMaster(request);
+
+    // grab the scheduler lock from another thread
+    // and verify an allocate call in this thread doesn't block on it
+    final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    final CyclicBarrier barrier = new CyclicBarrier(2);
+    Thread otherThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        synchronized(cs) {
+          try {
+            barrier.await();
+            barrier.await();
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          } catch (BrokenBarrierException e) {
+            e.printStackTrace();
+          }
+        }
+      }
+    });
+    otherThread.start();
+    barrier.await();
+    AllocateRequest allocateRequest =
+        AllocateRequest.newInstance(0, 0.0f, null, null, null);
+    client.allocate(allocateRequest);
+    barrier.await();
+    otherThread.join();
+
+    rm.stop();
+  }
+
+  @Test
+  public void testNumClusterNodes() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.setConf(conf);
+    RMContextImpl rmContext =  new RMContextImpl(null, null, null, null, null,
+        null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null);
+    cs.setRMContext(rmContext);
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    cs.init(csConf);
+    cs.start();
+    assertEquals(0, cs.getNumClusterNodes());
+
+    RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
+    RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
+    cs.handle(new NodeAddedSchedulerEvent(n1));
+    cs.handle(new NodeAddedSchedulerEvent(n2));
+    assertEquals(2, cs.getNumClusterNodes());
+
+    cs.handle(new NodeRemovedSchedulerEvent(n1));
+    assertEquals(1, cs.getNumClusterNodes());
+    cs.handle(new NodeAddedSchedulerEvent(n1));
+    assertEquals(2, cs.getNumClusterNodes());
+    cs.handle(new NodeRemovedSchedulerEvent(n2));
+    cs.handle(new NodeRemovedSchedulerEvent(n1));
+    assertEquals(0, cs.getNumClusterNodes());
+
+    cs.stop();
+  }
 }
 }