|
@@ -28,9 +28,12 @@ import javax.management.ObjectName;
|
|
|
import java.lang.management.ManagementFactory;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
-
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.junit.Test;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
|
|
public class TestFairCallQueue extends TestCase {
|
|
@@ -43,6 +46,7 @@ public class TestFairCallQueue extends TestCase {
|
|
|
when(ugi.getUserName()).thenReturn(id);
|
|
|
when(mockCall.getUserGroupInformation()).thenReturn(ugi);
|
|
|
when(mockCall.getPriorityLevel()).thenReturn(priority);
|
|
|
+ when(mockCall.toString()).thenReturn("id=" + id + " priority=" + priority);
|
|
|
|
|
|
return mockCall;
|
|
|
}
|
|
@@ -78,6 +82,57 @@ public class TestFairCallQueue extends TestCase {
|
|
|
assertEquals(fairCallQueue.remainingCapacity(), 1025);
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testPrioritization() {
|
|
|
+ int numQueues = 10;
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ fcq = new FairCallQueue<Schedulable>(numQueues, numQueues, "ns", conf);
|
|
|
+
|
|
|
+ //Schedulable[] calls = new Schedulable[numCalls];
|
|
|
+ List<Schedulable> calls = new ArrayList<>();
|
|
|
+ for (int i=0; i < numQueues; i++) {
|
|
|
+ Schedulable call = mockCall("u", i);
|
|
|
+ calls.add(call);
|
|
|
+ fcq.add(call);
|
|
|
+ }
|
|
|
+
|
|
|
+ final AtomicInteger currentIndex = new AtomicInteger();
|
|
|
+ fcq.setMultiplexer(new RpcMultiplexer(){
|
|
|
+ @Override
|
|
|
+ public int getAndAdvanceCurrentIndex() {
|
|
|
+ return currentIndex.get();
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ // if there is no call at a given index, return the next highest
|
|
|
+ // priority call available.
|
|
|
+ // v
|
|
|
+ //0123456789
|
|
|
+ currentIndex.set(3);
|
|
|
+ assertSame(calls.get(3), fcq.poll());
|
|
|
+ assertSame(calls.get(0), fcq.poll());
|
|
|
+ assertSame(calls.get(1), fcq.poll());
|
|
|
+ // v
|
|
|
+ //--2-456789
|
|
|
+ currentIndex.set(6);
|
|
|
+ assertSame(calls.get(6), fcq.poll());
|
|
|
+ assertSame(calls.get(2), fcq.poll());
|
|
|
+ assertSame(calls.get(4), fcq.poll());
|
|
|
+ // v
|
|
|
+ //-----5-789
|
|
|
+ currentIndex.set(8);
|
|
|
+ assertSame(calls.get(8), fcq.poll());
|
|
|
+ // v
|
|
|
+ //-----5-7-9
|
|
|
+ currentIndex.set(9);
|
|
|
+ assertSame(calls.get(9), fcq.poll());
|
|
|
+ assertSame(calls.get(5), fcq.poll());
|
|
|
+ assertSame(calls.get(7), fcq.poll());
|
|
|
+ //----------
|
|
|
+ assertNull(fcq.poll());
|
|
|
+ assertNull(fcq.poll());
|
|
|
+ }
|
|
|
+
|
|
|
//
|
|
|
// Ensure that FairCallQueue properly implements BlockingQueue
|
|
|
//
|