|
@@ -19,6 +19,7 @@ package org.apache.ambari.server.agent;
|
|
|
|
|
|
import static org.junit.Assert.*;
|
|
import static org.junit.Assert.*;
|
|
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
@@ -133,10 +134,14 @@ public class TestActionQueue {
|
|
ActionQueueOperation[] dequeOperators = new ActionQueueOperation[threadCount];
|
|
ActionQueueOperation[] dequeOperators = new ActionQueueOperation[threadCount];
|
|
ActionQueueOperation[] dequeAllOperators = new ActionQueueOperation[threadCount];
|
|
ActionQueueOperation[] dequeAllOperators = new ActionQueueOperation[threadCount];
|
|
|
|
|
|
|
|
+ List<Thread> producers = new ArrayList<Thread>();
|
|
|
|
+ List<Thread> consumers = new ArrayList<Thread>();
|
|
|
|
+
|
|
for (int i = 0; i < threadCount; i++) {
|
|
for (int i = 0; i < threadCount; i++) {
|
|
dequeOperators[i] = new ActionQueueOperation(aq, hosts,
|
|
dequeOperators[i] = new ActionQueueOperation(aq, hosts,
|
|
ActionQueueOperation.OpType.DEQUEUE);
|
|
ActionQueueOperation.OpType.DEQUEUE);
|
|
Thread t = new Thread(dequeOperators[i]);
|
|
Thread t = new Thread(dequeOperators[i]);
|
|
|
|
+ consumers.add(t);
|
|
t.start();
|
|
t.start();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -144,6 +149,7 @@ public class TestActionQueue {
|
|
enqueOperators[i] = new ActionQueueOperation(aq, hosts,
|
|
enqueOperators[i] = new ActionQueueOperation(aq, hosts,
|
|
ActionQueueOperation.OpType.ENQUEUE);
|
|
ActionQueueOperation.OpType.ENQUEUE);
|
|
Thread t = new Thread(enqueOperators[i]);
|
|
Thread t = new Thread(enqueOperators[i]);
|
|
|
|
+ producers.add(t);
|
|
t.start();
|
|
t.start();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -151,6 +157,7 @@ public class TestActionQueue {
|
|
dequeAllOperators[i] = new ActionQueueOperation(aq, hosts,
|
|
dequeAllOperators[i] = new ActionQueueOperation(aq, hosts,
|
|
ActionQueueOperation.OpType.DEQUEUEALL);
|
|
ActionQueueOperation.OpType.DEQUEUEALL);
|
|
Thread t = new Thread(dequeAllOperators[i]);
|
|
Thread t = new Thread(dequeAllOperators[i]);
|
|
|
|
+ consumers.add(t);
|
|
t.start();
|
|
t.start();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -162,6 +169,10 @@ public class TestActionQueue {
|
|
enqueOperators[i].stop();
|
|
enqueOperators[i].stop();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ for (Thread producer : producers) {
|
|
|
|
+ producer.join();
|
|
|
|
+ }
|
|
|
|
+
|
|
// Give time to get everything dequeued
|
|
// Give time to get everything dequeued
|
|
boolean allDequeued = false;
|
|
boolean allDequeued = false;
|
|
while (!allDequeued) {
|
|
while (!allDequeued) {
|
|
@@ -180,6 +191,10 @@ public class TestActionQueue {
|
|
dequeOperators[i].stop();
|
|
dequeOperators[i].stop();
|
|
dequeAllOperators[i].stop();
|
|
dequeAllOperators[i].stop();
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ for (Thread consumer : consumers) {
|
|
|
|
+ consumer.join();
|
|
|
|
+ }
|
|
|
|
|
|
for (int h = 0; h<hosts.length; h++) {
|
|
for (int h = 0; h<hosts.length; h++) {
|
|
long opsEnqueued = 0;
|
|
long opsEnqueued = 0;
|