Browse Source

YARN-2922. ConcurrentModificationException in CapacityScheduler's LeafQueue. Contributed by Rohith Sharmaks.

(cherry picked from commit ddc5be48fc35868abf7f59088f747c636e76a42a)
(cherry picked from commit c116743bdda2b1792bf872020a5e2b14d772ac60)
(cherry picked from commit 3c9d26ae14625de3e9437c07eceda0d05f1985b2)
Tsuyoshi Ozawa 10 years ago
parent
commit
03f9ac2de7

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -60,6 +60,9 @@ Release 2.6.1 - UNRELEASED
     YARN-2992. ZKRMStateStore crashes due to session expiry. (Karthik Kambatla
     via jianhe)
 
+    YARN-2922. ConcurrentModificationException in CapacityScheduler's LeafQueue.
+    (Rohith Sharmaks via ozawa)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -1888,7 +1888,7 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   // return a single Resource capturing the overal amount of pending resources
-  public Resource getTotalResourcePending() {
+  public synchronized Resource getTotalResourcePending() {
     Resource ret = BuilderUtils.newResource(0, 0);
     for (FiCaSchedulerApp f : activeApplications) {
       Resources.addTo(ret, f.getTotalPendingRequests());
@@ -1897,7 +1897,7 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   @Override
-  public void collectSchedulerApplications(
+  public synchronized void collectSchedulerApplications(
       Collection<ApplicationAttemptId> apps) {
     for (FiCaSchedulerApp pendingApp : pendingApplications) {
       apps.add(pendingApp.getApplicationAttemptId());

+ 86 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -37,11 +37,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -61,6 +63,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -2353,6 +2356,89 @@ public class TestLeafQueue {
     }
   }
 
+  @Test
+  public void testConcurrentAccess() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    MockRM rm = new MockRM();
+    rm.init(conf);
+    rm.start();
+
+    final String queue = "default";
+    final String user = "user";
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    final LeafQueue defaultQueue = (LeafQueue) cs.getQueue(queue);
+
+    final List<FiCaSchedulerApp> listOfApps =
+        createListOfApps(10000, user, defaultQueue);
+
+    final CyclicBarrier cb = new CyclicBarrier(2);
+    final List<ConcurrentModificationException> conException =
+        new ArrayList<ConcurrentModificationException>();
+
+    Thread submitAndRemove = new Thread(new Runnable() {
+
+      @Override
+      public void run() {
+
+        for (FiCaSchedulerApp fiCaSchedulerApp : listOfApps) {
+          defaultQueue.submitApplicationAttempt(fiCaSchedulerApp, user);
+        }
+        try {
+          cb.await();
+        } catch (Exception e) {
+          // Ignore
+        }
+        for (FiCaSchedulerApp fiCaSchedulerApp : listOfApps) {
+          defaultQueue.finishApplicationAttempt(fiCaSchedulerApp, queue);
+        }
+      }
+    }, "SubmitAndRemoveApplicationAttempt Thread");
+
+    Thread getAppsInQueue = new Thread(new Runnable() {
+      List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>();
+
+      @Override
+      public void run() {
+        try {
+          try {
+            cb.await();
+          } catch (Exception e) {
+            // Ignore
+          }
+          defaultQueue.collectSchedulerApplications(apps);
+        } catch (ConcurrentModificationException e) {
+          conException.add(e);
+        }
+      }
+
+    }, "GetAppsInQueue Thread");
+
+    submitAndRemove.start();
+    getAppsInQueue.start();
+
+    submitAndRemove.join();
+    getAppsInQueue.join();
+
+    assertTrue("ConcurrentModificationException is thrown",
+        conException.isEmpty());
+    rm.stop();
+
+  }
+
+  private List<FiCaSchedulerApp> createListOfApps(int noOfApps, String user,
+      LeafQueue defaultQueue) {
+    List<FiCaSchedulerApp> appsLists = new ArrayList<FiCaSchedulerApp>();
+    for (int i = 0; i < noOfApps; i++) {
+      ApplicationAttemptId appAttemptId_0 =
+          TestUtils.getMockApplicationAttemptId(i, 0);
+      FiCaSchedulerApp app_0 =
+          new FiCaSchedulerApp(appAttemptId_0, user, defaultQueue,
+              mock(ActiveUsersManager.class), spyRMContext);
+      appsLists.add(app_0);
+    }
+    return appsLists;
+  }
+
   private CapacitySchedulerContext mockCSContext(
       CapacitySchedulerConfiguration csConf, Resource clusterResource) {
     CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);