|
@@ -24,13 +24,13 @@ import org.apache.commons.logging.LogFactory;
|
|
|
import org.junit.Test;
|
|
|
import static org.junit.Assert.*;
|
|
|
import static org.mockito.Mockito.*;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
|
|
|
/**
|
|
|
* Test the half-blocking metrics sink queue
|
|
|
*/
|
|
|
public class TestSinkQueue {
|
|
|
-
|
|
|
- private final Log LOG = LogFactory.getLog(TestSinkQueue.class);
|
|
|
+ private static final Log LOG = LogFactory.getLog(TestSinkQueue.class);
|
|
|
|
|
|
/**
|
|
|
* Test common use case
|
|
@@ -61,6 +61,11 @@ public class TestSinkQueue {
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
@Test public void testEmptyBlocking() throws Exception {
|
|
|
+ testEmptyBlocking(0);
|
|
|
+ testEmptyBlocking(100);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testEmptyBlocking(int awhile) throws Exception {
|
|
|
final SinkQueue<Integer> q = new SinkQueue<Integer>(2);
|
|
|
final Runnable trigger = mock(Runnable.class);
|
|
|
// try consuming emtpy equeue and blocking
|
|
@@ -81,7 +86,10 @@ public class TestSinkQueue {
|
|
|
}
|
|
|
};
|
|
|
t.start();
|
|
|
- Thread.yield(); // Let the other block
|
|
|
+ // Should work with or without sleep
|
|
|
+ if (awhile > 0) {
|
|
|
+ Thread.sleep(awhile);
|
|
|
+ }
|
|
|
q.enqueue(1);
|
|
|
q.enqueue(2);
|
|
|
t.join();
|
|
@@ -226,21 +234,25 @@ public class TestSinkQueue {
|
|
|
LOG.info(e);
|
|
|
return;
|
|
|
}
|
|
|
- fail("should've thrown");
|
|
|
+ LOG.error("should've thrown CME");
|
|
|
+ fail("should've thrown CME");
|
|
|
}
|
|
|
|
|
|
- private SinkQueue<Integer>
|
|
|
- newSleepingConsumerQueue(int capacity, int... values) {
|
|
|
+ private SinkQueue<Integer> newSleepingConsumerQueue(int capacity,
|
|
|
+ int... values) throws Exception {
|
|
|
final SinkQueue<Integer> q = new SinkQueue<Integer>(capacity);
|
|
|
for (int i : values) {
|
|
|
q.enqueue(i);
|
|
|
}
|
|
|
+ final CountDownLatch barrier = new CountDownLatch(1);
|
|
|
Thread t = new Thread() {
|
|
|
@Override public void run() {
|
|
|
try {
|
|
|
+ Thread.sleep(10); // causes failure without barrier
|
|
|
q.consume(new Consumer<Integer>() {
|
|
|
public void consume(Integer e) throws InterruptedException {
|
|
|
LOG.info("sleeping");
|
|
|
+ barrier.countDown();
|
|
|
Thread.sleep(1000 * 86400); // a long time
|
|
|
}
|
|
|
});
|
|
@@ -253,7 +265,7 @@ public class TestSinkQueue {
|
|
|
t.setName("Sleeping consumer");
|
|
|
t.setDaemon(true); // so jvm can exit
|
|
|
t.start();
|
|
|
- Thread.yield(); // Let the consumer consume
|
|
|
+ barrier.await();
|
|
|
LOG.debug("Returning new sleeping consumer queue");
|
|
|
return q;
|
|
|
}
|