소스 검색

HADOOP-12189. Improve CallQueueManager#swapQueue to make queue elements drop nearly impossible. Contributed by Zhihai Xu.

Andrew Wang 9 년 전
부모
커밋
6736a1ab70

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -719,6 +719,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12161. Add getStoragePolicy API to the FileSystem interface.
     (Brahma Reddy Battula via Arpit Agarwal)
 
+    HADOOP-12189. Improve CallQueueManager#swapQueue to make queue elements
+    drop nearly impossible. (Zhihai Xu via wang)
+
   OPTIMIZATIONS
 
     HADOOP-11785. Reduce the number of listStatus operation in distcp

+ 18 - 9
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java

@@ -32,11 +32,15 @@ import org.apache.hadoop.conf.Configuration;
  */
 public class CallQueueManager<E> {
   public static final Log LOG = LogFactory.getLog(CallQueueManager.class);
+  // Number of checkpoints for empty queue.
+  private static final int CHECKPOINT_NUM = 20;
+  // Interval to check empty queue.
+  private static final long CHECKPOINT_INTERVAL_MS = 10;
 
   @SuppressWarnings("unchecked")
   static <E> Class<? extends BlockingQueue<E>> convertQueueClass(
-      Class<?> queneClass, Class<E> elementClass) {
-    return (Class<? extends BlockingQueue<E>>)queneClass;
+      Class<?> queueClass, Class<E> elementClass) {
+    return (Class<? extends BlockingQueue<E>>)queueClass;
   }
   private final boolean clientBackOffEnabled;
 
@@ -159,18 +163,23 @@ public class CallQueueManager<E> {
   }
 
   /**
-   * Checks if queue is empty by checking at two points in time.
+   * Checks if queue is empty by checking at CHECKPOINT_NUM points with
+   * CHECKPOINT_INTERVAL_MS interval.
    * This doesn't mean the queue might not fill up at some point later, but
    * it should decrease the probability that we lose a call this way.
    */
   private boolean queueIsReallyEmpty(BlockingQueue<?> q) {
-    boolean wasEmpty = q.isEmpty();
-    try {
-      Thread.sleep(10);
-    } catch (InterruptedException ie) {
-      return false;
+    for (int i = 0; i < CHECKPOINT_NUM; i++) {
+      try {
+        Thread.sleep(CHECKPOINT_INTERVAL_MS);
+      } catch (InterruptedException ie) {
+        return false;
+      }
+      if (!q.isEmpty()) {
+        return false;
+      }
     }
-    return q.isEmpty() && wasEmpty;
+    return true;
   }
 
   private String stringRepr(Object o) {

+ 3 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java

@@ -165,7 +165,7 @@ public class TestCallQueueManager {
     HashMap<Runnable, Thread> threads = new HashMap<Runnable, Thread>();
 
     // Create putters and takers
-    for (int i=0; i < 50; i++) {
+    for (int i=0; i < 1000; i++) {
       Putter p = new Putter(manager, -1, -1);
       Thread pt = new Thread(p);
       producers.add(p);
@@ -174,7 +174,7 @@ public class TestCallQueueManager {
       pt.start();
     }
 
-    for (int i=0; i < 20; i++) {
+    for (int i=0; i < 100; i++) {
       Taker t = new Taker(manager, -1, -1);
       Thread tt = new Thread(t);
       consumers.add(t);
@@ -183,7 +183,7 @@ public class TestCallQueueManager {
       tt.start();
     }
 
-    Thread.sleep(10);
+    Thread.sleep(500);
 
     for (int i=0; i < 5; i++) {
       manager.swapQueue(queueClass, 5000, "", null);