|
@@ -19,6 +19,7 @@
|
|
|
package org.apache.hadoop.metrics2.impl;
|
|
|
|
|
|
import java.util.ConcurrentModificationException;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
|
|
|
import org.junit.Test;
|
|
|
import static org.junit.Assert.*;
|
|
@@ -32,8 +33,7 @@ import static org.apache.hadoop.metrics2.impl.SinkQueue.*;
|
|
|
* Test the half-blocking metrics sink queue
|
|
|
*/
|
|
|
public class TestSinkQueue {
|
|
|
-
|
|
|
- private final Log LOG = LogFactory.getLog(TestSinkQueue.class);
|
|
|
+ private static final Log LOG = LogFactory.getLog(TestSinkQueue.class);
|
|
|
|
|
|
/**
|
|
|
* Test common use case
|
|
@@ -48,7 +48,7 @@ public class TestSinkQueue {
|
|
|
|
|
|
assertTrue("should enqueue", q.enqueue(2));
|
|
|
q.consume(new Consumer<Integer>() {
|
|
|
- public void consume(Integer e) {
|
|
|
+ @Override public void consume(Integer e) {
|
|
|
assertEquals("element", 2, (int) e);
|
|
|
}
|
|
|
});
|
|
@@ -64,6 +64,11 @@ public class TestSinkQueue {
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
@Test public void testEmptyBlocking() throws Exception {
|
|
|
+ testEmptyBlocking(0);
|
|
|
+ testEmptyBlocking(100);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testEmptyBlocking(int awhile) throws Exception {
|
|
|
final SinkQueue<Integer> q = new SinkQueue<Integer>(2);
|
|
|
final Runnable trigger = mock(Runnable.class);
|
|
|
// try consuming emtpy equeue and blocking
|
|
@@ -72,7 +77,7 @@ public class TestSinkQueue {
|
|
|
try {
|
|
|
assertEquals("element", 1, (int) q.dequeue());
|
|
|
q.consume(new Consumer<Integer>() {
|
|
|
- public void consume(Integer e) {
|
|
|
+ @Override public void consume(Integer e) {
|
|
|
assertEquals("element", 2, (int) e);
|
|
|
trigger.run();
|
|
|
}
|
|
@@ -84,7 +89,10 @@ public class TestSinkQueue {
|
|
|
}
|
|
|
};
|
|
|
t.start();
|
|
|
- Thread.yield(); // Let the other block
|
|
|
+ // Should work with or without sleep
|
|
|
+ if (awhile > 0) {
|
|
|
+ Thread.sleep(awhile);
|
|
|
+ }
|
|
|
q.enqueue(1);
|
|
|
q.enqueue(2);
|
|
|
t.join();
|
|
@@ -104,7 +112,7 @@ public class TestSinkQueue {
|
|
|
|
|
|
q.enqueue(3);
|
|
|
q.consume(new Consumer<Integer>() {
|
|
|
- public void consume(Integer e) {
|
|
|
+ @Override public void consume(Integer e) {
|
|
|
assertEquals("element", 3, (int) e);
|
|
|
}
|
|
|
});
|
|
@@ -127,7 +135,7 @@ public class TestSinkQueue {
|
|
|
final Runnable trigger = mock(Runnable.class);
|
|
|
q.consumeAll(new Consumer<Integer>() {
|
|
|
private int expected = 0;
|
|
|
- public void consume(Integer e) {
|
|
|
+ @Override public void consume(Integer e) {
|
|
|
assertEquals("element", expected++, (int) e);
|
|
|
trigger.run();
|
|
|
}
|
|
@@ -147,7 +155,7 @@ public class TestSinkQueue {
|
|
|
|
|
|
try {
|
|
|
q.consume(new Consumer<Integer>() {
|
|
|
- public void consume(Integer e) {
|
|
|
+ @Override public void consume(Integer e) {
|
|
|
throw ex;
|
|
|
}
|
|
|
});
|
|
@@ -196,22 +204,22 @@ public class TestSinkQueue {
|
|
|
assertEquals("queue back", 2, (int) q.back());
|
|
|
assertTrue("should drop", !q.enqueue(3)); // should not block
|
|
|
shouldThrowCME(new Fun() {
|
|
|
- public void run() {
|
|
|
+ @Override public void run() {
|
|
|
q.clear();
|
|
|
}
|
|
|
});
|
|
|
shouldThrowCME(new Fun() {
|
|
|
- public void run() throws Exception {
|
|
|
+ @Override public void run() throws Exception {
|
|
|
q.consume(null);
|
|
|
}
|
|
|
});
|
|
|
shouldThrowCME(new Fun() {
|
|
|
- public void run() throws Exception {
|
|
|
+ @Override public void run() throws Exception {
|
|
|
q.consumeAll(null);
|
|
|
}
|
|
|
});
|
|
|
shouldThrowCME(new Fun() {
|
|
|
- public void run() throws Exception {
|
|
|
+ @Override public void run() throws Exception {
|
|
|
q.dequeue();
|
|
|
}
|
|
|
});
|
|
@@ -229,21 +237,26 @@ public class TestSinkQueue {
|
|
|
LOG.info(e);
|
|
|
return;
|
|
|
}
|
|
|
- fail("should've thrown");
|
|
|
+ LOG.error("should've thrown CME");
|
|
|
+ fail("should've thrown CME");
|
|
|
}
|
|
|
|
|
|
private SinkQueue<Integer> newSleepingConsumerQueue(int capacity,
|
|
|
- int... values) {
|
|
|
+ int... values) throws Exception {
|
|
|
final SinkQueue<Integer> q = new SinkQueue<Integer>(capacity);
|
|
|
for (int i : values) {
|
|
|
q.enqueue(i);
|
|
|
}
|
|
|
+ final CountDownLatch barrier = new CountDownLatch(1);
|
|
|
Thread t = new Thread() {
|
|
|
@Override public void run() {
|
|
|
try {
|
|
|
+ Thread.sleep(10); // causes failure without barrier
|
|
|
q.consume(new Consumer<Integer>() {
|
|
|
+ @Override
|
|
|
public void consume(Integer e) throws InterruptedException {
|
|
|
LOG.info("sleeping");
|
|
|
+ barrier.countDown();
|
|
|
Thread.sleep(1000 * 86400); // a long time
|
|
|
}
|
|
|
});
|
|
@@ -256,7 +269,7 @@ public class TestSinkQueue {
|
|
|
t.setName("Sleeping consumer");
|
|
|
t.setDaemon(true); // so jvm can exit
|
|
|
t.start();
|
|
|
- Thread.yield(); // Let the consumer consume
|
|
|
+ barrier.await();
|
|
|
LOG.debug("Returning new sleeping consumer queue");
|
|
|
return q;
|
|
|
}
|
|
@@ -264,5 +277,4 @@ public class TestSinkQueue {
|
|
|
static interface Fun {
|
|
|
void run() throws Exception;
|
|
|
}
|
|
|
-
|
|
|
}
|