Browse Source

YARN-2328. FairScheduler: Verify update and continuous scheduling threads are stopped when the scheduler is stopped. (kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1614435 13f79535-47bb-0310-9956-ffa450edef68
Karthik Kambatla 11 năm trước cách đây
mục cha
commit
d23e7d4acb

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -53,6 +53,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2211. Persist AMRMToken master key in RMStateStore for RM recovery.
     (Xuan Gong via jianhe)
 
+    YARN-2328. FairScheduler: Verify update and continuous scheduling threads are
+    stopped when the scheduler is stopped. (kasha)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 37 - 25
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -139,8 +139,11 @@ public class FairScheduler extends
   private final int UPDATE_DEBUG_FREQUENCY = 5;
   private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
 
-  private Thread updateThread;
-  private Thread schedulingThread;
+  @VisibleForTesting
+  Thread updateThread;
+
+  @VisibleForTesting
+  Thread schedulingThread;
   // timeout to join when we stop this service
   protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
 
@@ -243,16 +246,21 @@ public class FairScheduler extends
   }
 
   /**
-   * A runnable which calls {@link FairScheduler#update()} every
+   * Thread which calls {@link FairScheduler#update()} every
    * <code>updateInterval</code> milliseconds.
    */
-  private class UpdateThread implements Runnable {
+  private class UpdateThread extends Thread {
+
+    @Override
     public void run() {
-      while (true) {
+      while (!Thread.currentThread().isInterrupted()) {
         try {
           Thread.sleep(updateInterval);
           update();
           preemptTasksIfNecessary();
+        } catch (InterruptedException ie) {
+          LOG.warn("Update thread interrupted. Exiting.");
+          return;
         } catch (Exception e) {
           LOG.error("Exception in fair scheduler UpdateThread", e);
         }
@@ -260,6 +268,26 @@ public class FairScheduler extends
     }
   }
 
+  /**
+   * Thread which attempts scheduling resources continuously,
+   * asynchronous to the node heartbeats.
+   */
+  private class ContinuousSchedulingThread extends Thread {
+
+    @Override
+    public void run() {
+      while (!Thread.currentThread().isInterrupted()) {
+        try {
+          continuousSchedulingAttempt();
+          Thread.sleep(getContinuousSchedulingSleepMs());
+        } catch (InterruptedException e) {
+          LOG.warn("Continuous scheduling thread interrupted. Exiting.", e);
+          return;
+        }
+      }
+    }
+  }
+
   /**
    * Recompute the internal variables used by the scheduler - per-job weights,
    * fair shares, deficits, minimum slot allocations, and amount of used and
@@ -970,7 +998,7 @@ public class FairScheduler extends
     }
   }
 
-  void continuousSchedulingAttempt() {
+  void continuousSchedulingAttempt() throws InterruptedException {
     List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
     // Sort the nodes by space available on them, so that we offer
     // containers on emptier nodes first, facilitating an even spread. This
@@ -1229,30 +1257,14 @@ public class FairScheduler extends
       throw new IOException("Failed to start FairScheduler", e);
     }
 
-    updateThread = new Thread(new UpdateThread());
+    updateThread = new UpdateThread();
     updateThread.setName("FairSchedulerUpdateThread");
     updateThread.setDaemon(true);
 
     if (continuousSchedulingEnabled) {
       // start continuous scheduling thread
-      schedulingThread = new Thread(
-          new Runnable() {
-            @Override
-            public void run() {
-              while (!Thread.currentThread().isInterrupted()) {
-                try {
-                  continuousSchedulingAttempt();
-                  Thread.sleep(getContinuousSchedulingSleepMs());
-                } catch (InterruptedException e) {
-                  LOG.error("Continuous scheduling thread interrupted. Exiting. ",
-                      e);
-                  return;
-                }
-              }
-            }
-          }
-      );
-      schedulingThread.setName("ContinuousScheduling");
+      schedulingThread = new ContinuousSchedulingThread();
+      schedulingThread.setName("FairSchedulerContinuousScheduling");
       schedulingThread.setDaemon(true);
     }
 

+ 25 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
@@ -3341,4 +3342,28 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         scheduler.findLowestCommonAncestorQueue(a1Queue, b1Queue);
     assertEquals(ancestorQueue, queue1);
   }
+
+  @Test
+  public void testThreadLifeCycle() throws InterruptedException {
+    conf.setBoolean(
+        FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, true);
+    scheduler.init(conf);
+    scheduler.start();
+
+    Thread updateThread = scheduler.updateThread;
+    Thread schedulingThread = scheduler.schedulingThread;
+
+    assertTrue(updateThread.isAlive());
+    assertTrue(schedulingThread.isAlive());
+
+    scheduler.stop();
+
+    int numRetries = 100;
+    while (numRetries-- > 0 &&
+        (updateThread.isAlive() || schedulingThread.isAlive())) {
+      Thread.sleep(50);
+    }
+
+    assertNotEquals("One of the threads is still alive", 0, numRetries);
+  }
 }