Browse Source

YARN-6448. Continuous scheduling thread crashes while sorting nodes. (Yufei Gu via kasha)

(cherry picked from commit b4c4f365948d36b36942f912ef994c1c21ba59e3)
Karthik Kambatla 8 years ago
parent
commit
7c0f9bd5e7

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java

@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
 
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -330,7 +331,8 @@ public abstract class SchedulerNode {
    * container.
    * container.
    * @param resource Resources to deduct.
    * @param resource Resources to deduct.
    */
    */
-  private synchronized void deductUnallocatedResource(Resource resource) {
+  @VisibleForTesting
+  public synchronized void deductUnallocatedResource(Resource resource) {
     if (resource == null) {
     if (resource == null) {
       LOG.error("Invalid deduction of null resource for "
       LOG.error("Invalid deduction of null resource for "
           + rmNode.getNodeAddress());
           + rmNode.getNodeAddress());

+ 6 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -919,8 +919,12 @@ public class FairScheduler extends
 
 
   void continuousSchedulingAttempt() throws InterruptedException {
   void continuousSchedulingAttempt() throws InterruptedException {
     long start = getClock().getTime();
     long start = getClock().getTime();
-    List<FSSchedulerNode> nodeIdList =
-        nodeTracker.sortedNodeList(nodeAvailableResourceComparator);
+    List<FSSchedulerNode> nodeIdList;
+    // Hold a lock to prevent comparator order changes due to changes of node
+    // unallocated resources
+    synchronized (this) {
+      nodeIdList = nodeTracker.sortedNodeList(nodeAvailableResourceComparator);
+    }
 
 
     // iterate all nodes
     // iterate all nodes
     for (FSSchedulerNode node : nodeIdList) {
     for (FSSchedulerNode node : nodeIdList) {

+ 36 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -30,6 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -57,6 +59,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
 
 
 public class TestContinuousScheduling extends FairSchedulerTestBase {
 public class TestContinuousScheduling extends FairSchedulerTestBase {
   private ControlledClock mockClock;
   private ControlledClock mockClock;
@@ -302,6 +305,39 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
     assertNotEquals("One of the threads is still alive", 0, numRetries);
     assertNotEquals("One of the threads is still alive", 0, numRetries);
   }
   }
 
 
+  @Test
+  public void TestNodeAvailableResourceComparatorTransitivity() {
+    final ClusterNodeTracker<FSSchedulerNode> clusterNodeTracker =
+        scheduler.getNodeTracker();
+
+    List<RMNode> rmNodes =
+        MockNodes.newNodes(2, 4000, Resource.newInstance(4096, 4));
+    for (RMNode rmNode : rmNodes) {
+      clusterNodeTracker.addNode(new FSSchedulerNode(rmNode, false));
+    }
+
+    // To simulate unallocated resource changes
+    new Thread() {
+      @Override
+      public void run() {
+        for (int j = 0; j < 100; j++) {
+          for (FSSchedulerNode node : clusterNodeTracker.getAllNodes()) {
+            int i = ThreadLocalRandom.current().nextInt(-30, 30);
+            synchronized (scheduler) {
+              node.deductUnallocatedResource(Resource.newInstance(i * 1024, i));
+            }
+          }
+        }
+      }
+    }.start();
+
+    try {
+      scheduler.continuousSchedulingAttempt();
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
   @Test
   @Test
   public void testFairSchedulerContinuousSchedulingInitTime() throws Exception {
   public void testFairSchedulerContinuousSchedulingInitTime() throws Exception {
     scheduler.start();
     scheduler.start();