|
@@ -18,13 +18,19 @@
|
|
|
|
|
|
package org.apache.hadoop.ipc;
|
|
|
|
|
|
+import static org.mockito.Mockito.anyInt;
|
|
|
+import static org.mockito.Mockito.anyObject;
|
|
|
+import static org.mockito.Mockito.doThrow;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
+import static org.mockito.Mockito.times;
|
|
|
|
|
|
import junit.framework.TestCase;
|
|
|
|
|
|
import javax.management.MBeanServer;
|
|
|
import javax.management.ObjectName;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
import java.lang.management.ManagementFactory;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -34,7 +40,10 @@ import java.util.List;
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.junit.Test;
|
|
|
+import org.mockito.Mockito;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
|
|
|
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
|
|
|
|
|
|
public class TestFairCallQueue extends TestCase {
|
|
|
private FairCallQueue<Schedulable> fcq;
|
|
@@ -133,6 +142,153 @@ public class TestFairCallQueue extends TestCase {
|
|
|
assertNull(fcq.poll());
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked") // for mock reset.
|
|
|
+ @Test
|
|
|
+ public void testInsertion() throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ // 3 queues, 2 slots each.
|
|
|
+ fcq = Mockito.spy(new FairCallQueue<Schedulable>(3, 6, "ns", conf));
|
|
|
+
|
|
|
+ Schedulable p0 = mockCall("a", 0);
|
|
|
+ Schedulable p1 = mockCall("b", 1);
|
|
|
+ Schedulable p2 = mockCall("c", 2);
|
|
|
+
|
|
|
+ // add to first queue.
|
|
|
+ Mockito.reset(fcq);
|
|
|
+ fcq.add(p0);
|
|
|
+ Mockito.verify(fcq, times(1)).offerQueue(0, p0);
|
|
|
+ Mockito.verify(fcq, times(0)).offerQueue(1, p0);
|
|
|
+ Mockito.verify(fcq, times(0)).offerQueue(2, p0);
|
|
|
+ Mockito.reset(fcq);
|
|
|
+ // 0:x- 1:-- 2:--
|
|
|
+
|
|
|
+ // add to second queue.
|
|
|
+ Mockito.reset(fcq);
|
|
|
+ fcq.add(p1);
|
|
|
+ Mockito.verify(fcq, times(0)).offerQueue(0, p1);
|
|
|
+ Mockito.verify(fcq, times(1)).offerQueue(1, p1);
|
|
|
+ Mockito.verify(fcq, times(0)).offerQueue(2, p1);
|
|
|
+ // 0:x- 1:x- 2:--
|
|
|
+
|
|
|
+ // add to first queue.
|
|
|
+ Mockito.reset(fcq);
|
|
|
+ fcq.add(p0);
|
|
|
+ Mockito.verify(fcq, times(1)).offerQueue(0, p0);
|
|
|
+ Mockito.verify(fcq, times(0)).offerQueue(1, p0);
|
|
|
+ Mockito.verify(fcq, times(0)).offerQueue(2, p0);
|
|
|
+ // 0:xx 1:x- 2:--
|
|
|
+
|
|
|
+ // add to first full queue spills over to second.
|
|
|
+ Mockito.reset(fcq);
|
|
|
+ fcq.add(p0);
|
|
|
+ Mockito.verify(fcq, times(1)).offerQueue(0, p0);
|
|
|
+ Mockito.verify(fcq, times(1)).offerQueue(1, p0);
|
|
|
+ Mockito.verify(fcq, times(0)).offerQueue(2, p0);
|
|
|
+ // 0:xx 1:xx 2:--
|
|
|
+
|
|
|
+ // add to second full queue spills over to third.
|
|
|
+ Mockito.reset(fcq);
|
|
|
+ fcq.add(p1);
|
|
|
+ Mockito.verify(fcq, times(0)).offerQueue(0, p1);
|
|
|
+ Mockito.verify(fcq, times(1)).offerQueue(1, p1);
|
|
|
+ Mockito.verify(fcq, times(1)).offerQueue(2, p1);
|
|
|
+ // 0:xx 1:xx 2:x-
|
|
|
+
|
|
|
+ // add to first and second full queue spills over to third.
|
|
|
+ Mockito.reset(fcq);
|
|
|
+ fcq.add(p0);
|
|
|
+ Mockito.verify(fcq, times(1)).offerQueue(0, p0);
|
|
|
+ Mockito.verify(fcq, times(1)).offerQueue(1, p0);
|
|
|
+ Mockito.verify(fcq, times(1)).offerQueue(2, p0);
|
|
|
+ // 0:xx 1:xx 2:xx
|
|
|
+
|
|
|
+ // adding non-lowest priority with all queues full throws a
|
|
|
+ // non-disconnecting rpc server exception.
|
|
|
+ Mockito.reset(fcq);
|
|
|
+ try {
|
|
|
+ fcq.add(p0);
|
|
|
+ fail("didn't fail");
|
|
|
+ } catch (IllegalStateException ise) {
|
|
|
+ checkOverflowException(ise, RpcStatusProto.ERROR);
|
|
|
+ }
|
|
|
+ Mockito.verify(fcq, times(1)).offerQueue(0, p0);
|
|
|
+ Mockito.verify(fcq, times(1)).offerQueue(1, p0);
|
|
|
+ Mockito.verify(fcq, times(1)).offerQueue(2, p0);
|
|
|
+
|
|
|
+ // adding non-lowest priority with all queues full throws a
|
|
|
+ // non-disconnecting rpc server exception.
|
|
|
+ Mockito.reset(fcq);
|
|
|
+ try {
|
|
|
+ fcq.add(p1);
|
|
|
+ fail("didn't fail");
|
|
|
+ } catch (IllegalStateException ise) {
|
|
|
+ checkOverflowException(ise, RpcStatusProto.ERROR);
|
|
|
+ }
|
|
|
+ Mockito.verify(fcq, times(0)).offerQueue(0, p1);
|
|
|
+ Mockito.verify(fcq, times(1)).offerQueue(1, p1);
|
|
|
+ Mockito.verify(fcq, times(1)).offerQueue(2, p1);
|
|
|
+
|
|
|
+ // adding lowest priority with all queues full throws a
|
|
|
+ // fatal disconnecting rpc server exception.
|
|
|
+ Mockito.reset(fcq);
|
|
|
+ try {
|
|
|
+ fcq.add(p2);
|
|
|
+ fail("didn't fail");
|
|
|
+ } catch (IllegalStateException ise) {
|
|
|
+ checkOverflowException(ise, RpcStatusProto.FATAL);
|
|
|
+ }
|
|
|
+ Mockito.verify(fcq, times(0)).offerQueue(0, p2);
|
|
|
+ Mockito.verify(fcq, times(0)).offerQueue(1, p2);
|
|
|
+ Mockito.verify(fcq, times(1)).offerQueue(2, p2);
|
|
|
+ Mockito.reset(fcq);
|
|
|
+
|
|
|
+ // used to abort what would be a blocking operation.
|
|
|
+ Exception stopPuts = new RuntimeException();
|
|
|
+
|
|
|
+ // put should offer to all but last subqueue, only put to last subqueue.
|
|
|
+ Mockito.reset(fcq);
|
|
|
+ try {
|
|
|
+ doThrow(stopPuts).when(fcq).putQueue(anyInt(), anyObject());
|
|
|
+ fcq.put(p0);
|
|
|
+ fail("didn't fail");
|
|
|
+ } catch (Exception e) {
|
|
|
+ assertSame(stopPuts, e);
|
|
|
+ }
|
|
|
+ Mockito.verify(fcq, times(1)).offerQueue(0, p0);
|
|
|
+ Mockito.verify(fcq, times(1)).offerQueue(1, p0);
|
|
|
+ Mockito.verify(fcq, times(0)).offerQueue(2, p0); // expect put, not offer.
|
|
|
+ Mockito.verify(fcq, times(1)).putQueue(2, p0);
|
|
|
+
|
|
|
+ // put with lowest priority should not offer, just put.
|
|
|
+ Mockito.reset(fcq);
|
|
|
+ try {
|
|
|
+ doThrow(stopPuts).when(fcq).putQueue(anyInt(), anyObject());
|
|
|
+ fcq.put(p2);
|
|
|
+ fail("didn't fail");
|
|
|
+ } catch (Exception e) {
|
|
|
+ assertSame(stopPuts, e);
|
|
|
+ }
|
|
|
+ Mockito.verify(fcq, times(0)).offerQueue(0, p2);
|
|
|
+ Mockito.verify(fcq, times(0)).offerQueue(1, p2);
|
|
|
+ Mockito.verify(fcq, times(0)).offerQueue(2, p2);
|
|
|
+ Mockito.verify(fcq, times(1)).putQueue(2, p2);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void checkOverflowException(Exception ex, RpcStatusProto status) {
|
|
|
+ // should be an overflow exception
|
|
|
+ assertTrue(ex.getClass().getName() + " != CallQueueOverflowException",
|
|
|
+ ex instanceof CallQueueOverflowException);
|
|
|
+ IOException ioe = ((CallQueueOverflowException)ex).getCause();
|
|
|
+ assertNotNull(ioe);
|
|
|
+ assertTrue(ioe.getClass().getName() + " != RpcServerException",
|
|
|
+ ioe instanceof RpcServerException);
|
|
|
+ RpcServerException rse = (RpcServerException)ioe;
|
|
|
+ // check error/fatal status and if it embeds a retriable ex.
|
|
|
+ assertEquals(status, rse.getRpcStatusProto());
|
|
|
+ assertTrue(rse.getClass().getName() + " != RetriableException",
|
|
|
+ rse.getCause() instanceof RetriableException);
|
|
|
+ }
|
|
|
+
|
|
|
//
|
|
|
// Ensure that FairCallQueue properly implements BlockingQueue
|
|
|
//
|