|
@@ -25,6 +25,9 @@ import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.NoSuchElementException;
|
|
import java.util.NoSuchElementException;
|
|
import java.util.Queue;
|
|
import java.util.Queue;
|
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
+import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
|
+import java.util.concurrent.ConcurrentMap;
|
|
|
|
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -36,73 +39,76 @@ public class ActionQueue {
|
|
|
|
|
|
private static Logger LOG = LoggerFactory.getLogger(ActionQueue.class);
|
|
private static Logger LOG = LoggerFactory.getLogger(ActionQueue.class);
|
|
|
|
|
|
- Map<String, Queue<AgentCommand>> hostQueues;
|
|
|
|
|
|
+ final ConcurrentMap<String, Queue<AgentCommand>> hostQueues;
|
|
|
|
|
|
public ActionQueue() {
|
|
public ActionQueue() {
|
|
- hostQueues = new HashMap<String, Queue<AgentCommand>>();
|
|
|
|
|
|
+ hostQueues = new ConcurrentHashMap<String, Queue<AgentCommand>>();
|
|
}
|
|
}
|
|
|
|
|
|
- private synchronized Queue<AgentCommand> getQueue(String hostname) {
|
|
|
|
|
|
+ private Queue<AgentCommand> getQueue(String hostname) {
|
|
return hostQueues.get(hostname);
|
|
return hostQueues.get(hostname);
|
|
}
|
|
}
|
|
|
|
|
|
- private synchronized void addQueue(String hostname, Queue<AgentCommand> q) {
|
|
|
|
- hostQueues.put(hostname, q);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Adds command to queue for given hostname
|
|
|
|
+ * @param hostname - hostname of node
|
|
|
|
+ * @param cmd - command to add to queue
|
|
|
|
+ */
|
|
public void enqueue(String hostname, AgentCommand cmd) {
|
|
public void enqueue(String hostname, AgentCommand cmd) {
|
|
- Queue<AgentCommand> q;
|
|
|
|
- synchronized (this) {
|
|
|
|
- q = getQueue(hostname);
|
|
|
|
|
|
+ Queue<AgentCommand> q = getQueue(hostname);
|
|
|
|
+
|
|
|
|
+ if (q == null) {
|
|
|
|
+ //try to add new queue to map if not found
|
|
|
|
+ q = hostQueues.putIfAbsent(hostname, new ConcurrentLinkedQueue<AgentCommand>());
|
|
if (q == null) {
|
|
if (q == null) {
|
|
- addQueue(hostname, new LinkedList<AgentCommand>());
|
|
|
|
|
|
+ //null means that new queue was added to map, get it
|
|
q = getQueue(hostname);
|
|
q = getQueue(hostname);
|
|
}
|
|
}
|
|
|
|
+ //otherwise we got existing queue (and put nothing!)
|
|
}
|
|
}
|
|
- synchronized (q) {
|
|
|
|
- if (q.contains(cmd)) {
|
|
|
|
- LOG.warn("cmd already exists in the queue, not adding again");
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- q.add(cmd);
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
|
|
+ q.add(cmd);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Get command from queue for given hostname
|
|
|
|
+ * @param hostname
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
public AgentCommand dequeue(String hostname) {
|
|
public AgentCommand dequeue(String hostname) {
|
|
Queue<AgentCommand> q = getQueue(hostname);
|
|
Queue<AgentCommand> q = getQueue(hostname);
|
|
if (q == null) {
|
|
if (q == null) {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
- synchronized (q) {
|
|
|
|
- if (q.isEmpty()) {
|
|
|
|
- return null;
|
|
|
|
- } else {
|
|
|
|
- return q.remove();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
|
|
+ return q.poll();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Try to dequeue command with provided id.
|
|
|
|
+ * @param hostname
|
|
|
|
+ * @param commandId
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
public AgentCommand dequeue(String hostname, String commandId) {
|
|
public AgentCommand dequeue(String hostname, String commandId) {
|
|
Queue<AgentCommand> q = getQueue(hostname);
|
|
Queue<AgentCommand> q = getQueue(hostname);
|
|
if (q == null) {
|
|
if (q == null) {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
- synchronized (q) {
|
|
|
|
- if (q.isEmpty()) {
|
|
|
|
- return null;
|
|
|
|
- } else {
|
|
|
|
- AgentCommand c = null;
|
|
|
|
- for (Iterator it = q.iterator(); it.hasNext();) {
|
|
|
|
- AgentCommand ac = (AgentCommand) it.next();
|
|
|
|
- if (ac instanceof ExecutionCommand && ((ExecutionCommand) ac)
|
|
|
|
- .getCommandId().equals(commandId)) {
|
|
|
|
- c = ac;
|
|
|
|
- it.remove();
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
|
|
+ if (q.isEmpty()) {
|
|
|
|
+ return null;
|
|
|
|
+ } else {
|
|
|
|
+ AgentCommand c = null;
|
|
|
|
+ for (Iterator it = q.iterator(); it.hasNext(); ) {
|
|
|
|
+ AgentCommand ac = (AgentCommand) it.next();
|
|
|
|
+ if (ac instanceof ExecutionCommand && ((ExecutionCommand) ac)
|
|
|
|
+ .getCommandId().equals(commandId)) {
|
|
|
|
+ c = ac;
|
|
|
|
+ it.remove();
|
|
|
|
+ break;
|
|
}
|
|
}
|
|
- return c;
|
|
|
|
}
|
|
}
|
|
|
|
+ return c;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -111,9 +117,7 @@ public class ActionQueue {
|
|
if (q == null) {
|
|
if (q == null) {
|
|
return 0;
|
|
return 0;
|
|
}
|
|
}
|
|
- synchronized(q) {
|
|
|
|
return q.size();
|
|
return q.size();
|
|
- }
|
|
|
|
}
|
|
}
|
|
|
|
|
|
public List<AgentCommand> dequeueAll(String hostname) {
|
|
public List<AgentCommand> dequeueAll(String hostname) {
|
|
@@ -122,17 +126,17 @@ public class ActionQueue {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
List<AgentCommand> l = new ArrayList<AgentCommand>();
|
|
List<AgentCommand> l = new ArrayList<AgentCommand>();
|
|
- synchronized (q) {
|
|
|
|
- while (true) {
|
|
|
|
- try {
|
|
|
|
- AgentCommand cmd = q.remove();
|
|
|
|
- if (cmd != null) {
|
|
|
|
- l.add(cmd);
|
|
|
|
- }
|
|
|
|
- } catch (NoSuchElementException ex) {
|
|
|
|
- return l;
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
|
|
+ AgentCommand command;
|
|
|
|
+ do {
|
|
|
|
+ //get commands from queue until empty
|
|
|
|
+ command = q.poll();
|
|
|
|
+ if (command != null) {
|
|
|
|
+ l.add(command);
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
+ } while (command != null);
|
|
|
|
+
|
|
|
|
+ return l;
|
|
|
|
+
|
|
}
|
|
}
|
|
}
|
|
}
|