瀏覽代碼

HADOOP-16581. Revise ValueQueue to correctly replenish queues that go below the watermark (#1463)

(cherry picked from dd0834696a694564af65a1355c9d13275f44df51)
Yuval Degani 5 年之前
父節點
當前提交
e31594f20e

+ 8 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java

@@ -379,13 +379,15 @@ public class ValueQueue <E> {
           if (numToFill > 0) {
             refiller.fillQueueForKey(keyName, ekvs, numToFill);
           }
-          // Asynch task to fill > lowWatermark
-          if (i <= (int) (lowWatermark * numValues)) {
-            submitRefillTask(keyName, keyQueue);
-          }
-          return ekvs;
+
+          break;
+        } else {
+          ekvs.add(val);
         }
-        ekvs.add(val);
+      }
+      // Schedule a refill task in case queue has gone below the watermark
+      if (keyQueue.size() < (int) (lowWatermark * numValues)) {
+        submitRefillTask(keyName, keyQueue);
       }
     } catch (Exception e) {
       throw new IOException("Exception while contacting value generator ", e);

+ 123 - 36
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.crypto.key;
 import java.io.IOException;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
@@ -31,7 +32,6 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.google.common.base.Supplier;
 import com.google.common.collect.Sets;
 
 public class TestValueQueue {
@@ -62,6 +62,18 @@ public class TestValueQueue {
     }
   }
 
+  private void waitForRefill(ValueQueue<?> valueQueue, String queueName, int queueSize)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(() -> {
+      int size = valueQueue.getSize(queueName);
+      if (size != queueSize) {
+        LOG.info("Current ValueQueue size is " + size);
+        return false;
+      }
+      return true;
+    }, 100, 3000);
+  }
+
   /**
    * Verifies that Queue is initially filled to "numInitValues"
    */
@@ -69,7 +81,7 @@ public class TestValueQueue {
   public void testInitFill() throws Exception {
     MockFiller filler = new MockFiller();
     ValueQueue<String> vq =
-        new ValueQueue<String>(10, 0.1f, 300, 1,
+        new ValueQueue<String>(10, 0.1f, 30000, 1,
             SyncGenerationPolicy.ALL, filler);
     Assert.assertEquals("test", vq.getNext("k1"));
     Assert.assertEquals(1, filler.getTop().num);
@@ -83,7 +95,7 @@ public class TestValueQueue {
   public void testWarmUp() throws Exception {
     MockFiller filler = new MockFiller();
     ValueQueue<String> vq =
-        new ValueQueue<String>(10, 0.5f, 300, 1,
+        new ValueQueue<String>(10, 0.5f, 30000, 1,
             SyncGenerationPolicy.ALL, filler);
     vq.initializeQueuesForKeys("k1", "k2", "k3");
     FillInfo[] fillInfos =
@@ -106,14 +118,17 @@ public class TestValueQueue {
   public void testRefill() throws Exception {
     MockFiller filler = new MockFiller();
     ValueQueue<String> vq =
-        new ValueQueue<String>(10, 0.1f, 300, 1,
+        new ValueQueue<String>(100, 0.1f, 30000, 1,
             SyncGenerationPolicy.ALL, filler);
+    // Trigger a prefill (10) and an async refill (91)
     Assert.assertEquals("test", vq.getNext("k1"));
-    Assert.assertEquals(1, filler.getTop().num);
-    // Trigger refill
-    vq.getNext("k1");
-    Assert.assertEquals(1, filler.getTop().num);
     Assert.assertEquals(10, filler.getTop().num);
+
+    // Wait for the async task to finish
+    waitForRefill(vq, "k1", 100);
+    // Refill task should add 91 values to get to a full queue (10 produced by
+    // the prefill to the low watermark, 1 consumed by getNext())
+    Assert.assertEquals(91, filler.getTop().num);
     vq.shutdown();
   }
 
@@ -125,10 +140,27 @@ public class TestValueQueue {
   public void testNoRefill() throws Exception {
     MockFiller filler = new MockFiller();
     ValueQueue<String> vq =
-        new ValueQueue<String>(10, 0.5f, 300, 1,
+        new ValueQueue<String>(10, 0.5f, 30000, 1,
             SyncGenerationPolicy.ALL, filler);
+    // Trigger a prefill (5) and an async refill (6)
     Assert.assertEquals("test", vq.getNext("k1"));
     Assert.assertEquals(5, filler.getTop().num);
+
+    // Wait for the async task to finish
+    waitForRefill(vq, "k1", 10);
+    // Refill task should add 6 values to get to a full queue (5 produced by
+    // the prefill to the low watermark, 1 consumed by getNext())
+    Assert.assertEquals(6, filler.getTop().num);
+
+    // Take another value, queue is still above the watermark
+    Assert.assertEquals("test", vq.getNext("k1"));
+
+    // Wait a while to make sure that no async refills are triggered
+    try {
+      waitForRefill(vq, "k1", 10);
+    } catch (TimeoutException ignored) {
+      // This is the correct outcome - no refill is expected
+    }
     Assert.assertEquals(null, filler.getTop());
     vq.shutdown();
   }
@@ -140,11 +172,29 @@ public class TestValueQueue {
   public void testgetAtMostPolicyALL() throws Exception {
     MockFiller filler = new MockFiller();
     final ValueQueue<String> vq =
-        new ValueQueue<String>(10, 0.1f, 300, 1,
+        new ValueQueue<String>(10, 0.1f, 30000, 1,
             SyncGenerationPolicy.ALL, filler);
+    // Trigger a prefill (1) and an async refill (10)
     Assert.assertEquals("test", vq.getNext("k1"));
     Assert.assertEquals(1, filler.getTop().num);
 
+    // Wait for the async task to finish
+    waitForRefill(vq, "k1", 10);
+    // Refill task should add 10 values to get to a full queue (1 produced by
+    // the prefill to the low watermark, 1 consumed by getNext())
+    Assert.assertEquals(10, filler.getTop().num);
+
+    // Drain completely, no further refills triggered
+    vq.drain("k1");
+
+    // Wait a while to make sure that no async refills are triggered
+    try {
+      waitForRefill(vq, "k1", 10);
+    } catch (TimeoutException ignored) {
+      // This is the correct outcome - no refill is expected
+    }
+    Assert.assertNull(filler.getTop());
+
     // Synchronous call:
     // 1. Synchronously fill returned list
     // 2. Start another async task to fill the queue in the cache
@@ -154,23 +204,16 @@ public class TestValueQueue {
         filler.getTop().num);
 
     // Wait for the async task to finish
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-          int size = vq.getSize("k1");
-          if (size != 10) {
-            LOG.info("Current ValueQueue size is " + size);
-            return false;
-          }
-          return true;
-      }
-    }, 100, 3000);
+    waitForRefill(vq, "k1", 10);
+    // Refill task should add 10 values to get to a full queue
     Assert.assertEquals("Failed in async call.", 10, filler.getTop().num);
 
     // Drain completely after filled by the async thread
-    Assert.assertEquals("Failed to drain completely after async.", 10,
-        vq.getAtMost("k1", 10).size());
-    // Synchronous call (No Async call since num > lowWatermark)
+    vq.drain("k1");
+    Assert.assertEquals("Failed to drain completely after async.", 0,
+        vq.getSize("k1"));
+
+    // Synchronous call
     Assert.assertEquals("Failed to get all 19.", 19,
         vq.getAtMost("k1", 19).size());
     Assert.assertEquals("Failed in sync call.", 19, filler.getTop().num);
@@ -184,14 +227,29 @@ public class TestValueQueue {
   public void testgetAtMostPolicyATLEAST_ONE() throws Exception {
     MockFiller filler = new MockFiller();
     ValueQueue<String> vq =
-        new ValueQueue<String>(10, 0.3f, 300, 1,
+        new ValueQueue<String>(10, 0.3f, 30000, 1,
             SyncGenerationPolicy.ATLEAST_ONE, filler);
+    // Trigger a prefill (3) and an async refill (8)
     Assert.assertEquals("test", vq.getNext("k1"));
     Assert.assertEquals(3, filler.getTop().num);
-    // Drain completely
-    Assert.assertEquals(2, vq.getAtMost("k1", 10).size());
-    // Asynch Refill call
-    Assert.assertEquals(10, filler.getTop().num);
+
+    // Wait for the async task to finish
+    waitForRefill(vq, "k1", 10);
+    // Refill task should add 8 values to get to a full queue (3 produced by
+    // the prefill to the low watermark, 1 consumed by getNext())
+    Assert.assertEquals("Failed in async call.", 8, filler.getTop().num);
+
+    // Drain completely, no further refills triggered
+    vq.drain("k1");
+
+    // Queue is empty, sync will return a single value and trigger a refill
+    Assert.assertEquals(1, vq.getAtMost("k1", 10).size());
+    Assert.assertEquals(1, filler.getTop().num);
+
+    // Wait for the async task to finish
+    waitForRefill(vq, "k1", 10);
+    // Refill task should add 10 values to get to a full queue
+    Assert.assertEquals("Failed in async call.", 10, filler.getTop().num);
     vq.shutdown();
   }
 
@@ -202,16 +260,29 @@ public class TestValueQueue {
   public void testgetAtMostPolicyLOW_WATERMARK() throws Exception {
     MockFiller filler = new MockFiller();
     ValueQueue<String> vq =
-        new ValueQueue<String>(10, 0.3f, 300, 1,
+        new ValueQueue<String>(10, 0.3f, 30000, 1,
             SyncGenerationPolicy.LOW_WATERMARK, filler);
+    // Trigger a prefill (3) and an async refill (8)
     Assert.assertEquals("test", vq.getNext("k1"));
     Assert.assertEquals(3, filler.getTop().num);
-    // Drain completely
+
+    // Wait for the async task to finish
+    waitForRefill(vq, "k1", 10);
+    // Refill task should add 8 values to get to a full queue (3 produced by
+    // the prefill to the low watermark, 1 consumed by getNext())
+    Assert.assertEquals("Failed in async call.", 8, filler.getTop().num);
+
+    // Drain completely, no further refills triggered
+    vq.drain("k1");
+
+    // Queue is empty, sync will return 3 values and trigger a refill
     Assert.assertEquals(3, vq.getAtMost("k1", 10).size());
-    // Synchronous call
-    Assert.assertEquals(1, filler.getTop().num);
-    // Asynch Refill call
-    Assert.assertEquals(10, filler.getTop().num);
+    Assert.assertEquals(3, filler.getTop().num);
+
+    // Wait for the async task to finish
+    waitForRefill(vq, "k1", 10);
+    // Refill task should add 10 values to get to a full queue
+    Assert.assertEquals("Failed in async call.", 10, filler.getTop().num);
     vq.shutdown();
   }
 
@@ -219,11 +290,27 @@ public class TestValueQueue {
   public void testDrain() throws Exception {
     MockFiller filler = new MockFiller();
     ValueQueue<String> vq =
-        new ValueQueue<String>(10, 0.1f, 300, 1,
+        new ValueQueue<String>(10, 0.1f, 30000, 1,
             SyncGenerationPolicy.ALL, filler);
+    // Trigger a prefill (1) and an async refill (10)
     Assert.assertEquals("test", vq.getNext("k1"));
     Assert.assertEquals(1, filler.getTop().num);
+
+    // Wait for the async task to finish
+    waitForRefill(vq, "k1", 10);
+    // Refill task should add 10 values to get to a full queue (1 produced by
+    // the prefill to the low watermark, 1 consumed by getNext())
+    Assert.assertEquals(10, filler.getTop().num);
+
+    // Drain completely, no further refills triggered
     vq.drain("k1");
+
+    // Wait a while to make sure that no async refills are triggered
+    try {
+      waitForRefill(vq, "k1", 10);
+    } catch (TimeoutException ignored) {
+      // This is the correct outcome - no refill is expected
+    }
     Assert.assertNull(filler.getTop());
     vq.shutdown();
   }