瀏覽代碼

HADOOP-14912. FairCallQueue may defer servicing calls. Contributed by Daryn Sharp

(cherry picked from commit 1123f8f0b62292197f5433cd40e66d8620044608)
Jason Lowe 7 年之前
父節點
當前提交
c0d56cad5c

+ 5 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java

@@ -122,13 +122,15 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
   private E removeNextElement() {
     int priority = multiplexer.getAndAdvanceCurrentIndex();
     E e = queues.get(priority).poll();
-    if (e == null) {
+    // a semaphore permit has been acquired, so an element MUST be extracted
+    // or the semaphore and queued elements will go out of sync.  loop to
+    // avoid race condition if elements are added behind the current position,
+    // awakening other threads that poll the elements ahead of our position.
+    while (e == null) {
       for (int idx = 0; e == null && idx < queues.size(); idx++) {
         e = queues.get(idx).poll();
       }
     }
-    // guaranteed to find an element if caller acquired permit.
-    assert e != null : "consumer didn't acquire semaphore!";
     return e;
   }