|
@@ -25,7 +25,11 @@ import java.net.InetSocketAddress;
|
|
|
import java.net.SocketException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.Semaphore;
|
|
|
|
|
@@ -193,11 +197,11 @@ public class AuthFastLeaderElection implements Election {
|
|
|
long lastProposedLeader;
|
|
|
long lastProposedZxid;
|
|
|
long lastEpoch;
|
|
|
- final LinkedBlockingQueue<Long> acksqueue;
|
|
|
- final HashMap<Long, Long> challengeMap;
|
|
|
- final HashMap<Long, Semaphore> challengeMutex;
|
|
|
- final HashMap<Long, Semaphore> ackMutex;
|
|
|
- final HashMap<InetSocketAddress, HashMap<Long, Long>> addrChallengeMap;
|
|
|
+ final Set<Long> ackset;
|
|
|
+ final ConcurrentHashMap<Long, Long> challengeMap;
|
|
|
+ final ConcurrentHashMap<Long, Semaphore> challengeMutex;
|
|
|
+ final ConcurrentHashMap<Long, Semaphore> ackMutex;
|
|
|
+ final ConcurrentHashMap<InetSocketAddress, ConcurrentHashMap<Long, Long>> addrChallengeMap;
|
|
|
|
|
|
class WorkerReceiver implements Runnable {
|
|
|
|
|
@@ -210,11 +214,9 @@ public class AuthFastLeaderElection implements Election {
|
|
|
}
|
|
|
|
|
|
boolean saveChallenge(long tag, long challenge) {
|
|
|
-
|
|
|
- //Long l = challengeMutex.get(tag);
|
|
|
Semaphore s = challengeMutex.get(tag);
|
|
|
if (s != null) {
|
|
|
- synchronized (challengeMap) {
|
|
|
+ synchronized (Messenger.this) {
|
|
|
challengeMap.put(tag, challenge);
|
|
|
challengeMutex.remove(tag);
|
|
|
}
|
|
@@ -310,27 +312,30 @@ public class AuthFastLeaderElection implements Election {
|
|
|
InetSocketAddress addr = (InetSocketAddress) responsePacket
|
|
|
.getSocketAddress();
|
|
|
if (authEnabled) {
|
|
|
- if (addrChallengeMap.get(addr).get(tag) != null) {
|
|
|
- recChallenge = responseBuffer.getLong();
|
|
|
-
|
|
|
- if (addrChallengeMap.get(addr).get(tag) == recChallenge) {
|
|
|
- recvqueue.offer(n);
|
|
|
-
|
|
|
- ToSend a = new ToSend(ToSend.mType.ack,
|
|
|
- tag, current.id,
|
|
|
- current.zxid,
|
|
|
- logicalclock, self.getPeerState(),
|
|
|
- addr);
|
|
|
-
|
|
|
- sendqueue.offer(a);
|
|
|
+ ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(addr);
|
|
|
+ if(tmpMap != null){
|
|
|
+ if (tmpMap.get(tag) != null) {
|
|
|
+ recChallenge = responseBuffer.getLong();
|
|
|
+
|
|
|
+ if (tmpMap.get(tag) == recChallenge) {
|
|
|
+ recvqueue.offer(n);
|
|
|
+
|
|
|
+ ToSend a = new ToSend(ToSend.mType.ack,
|
|
|
+ tag, current.id,
|
|
|
+ current.zxid,
|
|
|
+ logicalclock, self.getPeerState(),
|
|
|
+ addr);
|
|
|
+
|
|
|
+ sendqueue.offer(a);
|
|
|
+ } else {
|
|
|
+ LOG.warn("Incorrect challenge: "
|
|
|
+ + recChallenge + ", "
|
|
|
+ + addrChallengeMap.toString());
|
|
|
+ }
|
|
|
} else {
|
|
|
- LOG.warn("Incorrect challenge: "
|
|
|
- + recChallenge + ", "
|
|
|
- + addrChallengeMap.toString());
|
|
|
+ LOG.warn("No challenge for host: " + addr
|
|
|
+ + " " + tag);
|
|
|
}
|
|
|
- } else {
|
|
|
- LOG.warn("No challenge for host: " + addr
|
|
|
- + " " + tag);
|
|
|
}
|
|
|
} else {
|
|
|
recvqueue.offer(n);
|
|
@@ -354,11 +359,17 @@ public class AuthFastLeaderElection implements Election {
|
|
|
s.release();
|
|
|
else LOG.error("Empty ack semaphore");
|
|
|
|
|
|
- acksqueue.offer(tag);
|
|
|
+ ackset.add(tag);
|
|
|
|
|
|
if (authEnabled) {
|
|
|
- addrChallengeMap.get(responsePacket
|
|
|
- .getSocketAddress()).remove(tag);
|
|
|
+ ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(responsePacket
|
|
|
+ .getSocketAddress());
|
|
|
+ if(tmpMap != null) {
|
|
|
+ tmpMap.remove(tag);
|
|
|
+ } else {
|
|
|
+ LOG.warn("No such address in the ensemble configuration " + responsePacket
|
|
|
+ .getSocketAddress());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
if (ackstate != QuorumPeer.ServerState.LOOKING) {
|
|
@@ -485,40 +496,46 @@ public class AuthFastLeaderElection implements Election {
|
|
|
*/
|
|
|
|
|
|
long newChallenge;
|
|
|
- if (addrChallengeMap.get(m.addr).containsKey(m.tag)) {
|
|
|
- newChallenge = addrChallengeMap.get(m.addr).get(m.tag);
|
|
|
- } else {
|
|
|
- newChallenge = genChallenge();
|
|
|
- }
|
|
|
+ ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(m.addr);
|
|
|
+ if(tmpMap != null){
|
|
|
+ Long tmpLong = tmpMap.get(m.tag);
|
|
|
+ if (tmpLong != null) {
|
|
|
+ newChallenge = tmpLong;
|
|
|
+ } else {
|
|
|
+ newChallenge = genChallenge();
|
|
|
+ }
|
|
|
|
|
|
- addrChallengeMap.get(m.addr).put(m.tag, newChallenge);
|
|
|
+ tmpMap.put(m.tag, newChallenge);
|
|
|
|
|
|
- requestBuffer.clear();
|
|
|
- requestBuffer.putInt(ToSend.mType.challenge.ordinal());
|
|
|
- requestBuffer.putLong(m.tag);
|
|
|
- requestBuffer.putInt(m.state.ordinal());
|
|
|
- requestBuffer.putLong(newChallenge);
|
|
|
- zeroes = new byte[24];
|
|
|
- requestBuffer.put(zeroes);
|
|
|
+ requestBuffer.clear();
|
|
|
+ requestBuffer.putInt(ToSend.mType.challenge.ordinal());
|
|
|
+ requestBuffer.putLong(m.tag);
|
|
|
+ requestBuffer.putInt(m.state.ordinal());
|
|
|
+ requestBuffer.putLong(newChallenge);
|
|
|
+ zeroes = new byte[24];
|
|
|
+ requestBuffer.put(zeroes);
|
|
|
|
|
|
- requestPacket.setLength(48);
|
|
|
- try {
|
|
|
- requestPacket.setSocketAddress(m.addr);
|
|
|
- } catch (IllegalArgumentException e) {
|
|
|
- // Sun doesn't include the address that causes this
|
|
|
- // exception to be thrown, so we wrap the exception
|
|
|
- // in order to capture this critical detail.
|
|
|
- throw new IllegalArgumentException(
|
|
|
- "Unable to set socket address on packet, msg:"
|
|
|
- + e.getMessage() + " with addr:" + m.addr,
|
|
|
- e);
|
|
|
- }
|
|
|
+ requestPacket.setLength(48);
|
|
|
+ try {
|
|
|
+ requestPacket.setSocketAddress(m.addr);
|
|
|
+ } catch (IllegalArgumentException e) {
|
|
|
+ // Sun doesn't include the address that causes this
|
|
|
+ // exception to be thrown, so we wrap the exception
|
|
|
+ // in order to capture this critical detail.
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "Unable to set socket address on packet, msg:"
|
|
|
+ + e.getMessage() + " with addr:" + m.addr,
|
|
|
+ e);
|
|
|
+ }
|
|
|
|
|
|
|
|
|
- try {
|
|
|
- mySocket.send(requestPacket);
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Exception while sending challenge: ", e);
|
|
|
+ try {
|
|
|
+ mySocket.send(requestPacket);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Exception while sending challenge: ", e);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ LOG.error("Address is not in the configuration: " + m.addr);
|
|
|
}
|
|
|
|
|
|
break;
|
|
@@ -573,9 +590,8 @@ public class AuthFastLeaderElection implements Election {
|
|
|
double timeout = ackWait
|
|
|
* java.lang.Math.pow(2, attempts);
|
|
|
|
|
|
- //Long l = new Long(m.tag);
|
|
|
Semaphore s = new Semaphore(0);
|
|
|
- synchronized (s) {
|
|
|
+ synchronized(Messenger.this) {
|
|
|
challengeMutex.put(m.tag, s);
|
|
|
s.tryAcquire((long) timeout, TimeUnit.MILLISECONDS);
|
|
|
myChallenge = challengeMap
|
|
@@ -598,7 +614,12 @@ public class AuthFastLeaderElection implements Election {
|
|
|
|
|
|
if (authEnabled) {
|
|
|
requestBuffer.position(40);
|
|
|
- requestBuffer.putLong(challengeMap.get(m.tag));
|
|
|
+ Long tmpLong = challengeMap.get(m.tag);
|
|
|
+ if(tmpLong != null){
|
|
|
+ requestBuffer.putLong(tmpLong);
|
|
|
+ } else {
|
|
|
+ LOG.warn("No challenge with tag: " + m.tag);
|
|
|
+ }
|
|
|
}
|
|
|
mySocket.send(requestPacket);
|
|
|
try {
|
|
@@ -610,26 +631,11 @@ public class AuthFastLeaderElection implements Election {
|
|
|
} catch (InterruptedException e) {
|
|
|
LOG.warn("Ack exception: ", e);
|
|
|
}
|
|
|
- synchronized (acksqueue) {
|
|
|
- for (int i = 0; i < acksqueue.size(); ++i) {
|
|
|
- Long newack = acksqueue.poll();
|
|
|
-
|
|
|
- /*
|
|
|
- * Under highly concurrent load, a thread
|
|
|
- * may get into this loop but by the time it
|
|
|
- * tries to read from the queue, the queue
|
|
|
- * is empty. There are two alternatives:
|
|
|
- * synchronize this block, or test if newack
|
|
|
- * is null.
|
|
|
- *
|
|
|
- */
|
|
|
-
|
|
|
- if (newack == m.tag) {
|
|
|
- myAck = true;
|
|
|
- } else
|
|
|
- acksqueue.offer(newack);
|
|
|
- }
|
|
|
- }
|
|
|
+
|
|
|
+ if(ackset.remove(m.tag)){
|
|
|
+ myAck = true;
|
|
|
+ }
|
|
|
+
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Sending exception: ", e);
|
|
|
/*
|
|
@@ -640,8 +646,8 @@ public class AuthFastLeaderElection implements Election {
|
|
|
/*
|
|
|
* Received ack successfully, so return
|
|
|
*/
|
|
|
- if (challengeMap.get(m.tag) != null)
|
|
|
- challengeMap.remove(m.tag);
|
|
|
+ challengeMap.remove(m.tag);
|
|
|
+
|
|
|
return;
|
|
|
} else
|
|
|
attempts++;
|
|
@@ -690,17 +696,17 @@ public class AuthFastLeaderElection implements Election {
|
|
|
}
|
|
|
|
|
|
public boolean queueEmpty() {
|
|
|
- return (sendqueue.isEmpty() || acksqueue.isEmpty() || recvqueue
|
|
|
+ return (sendqueue.isEmpty() || ackset.isEmpty() || recvqueue
|
|
|
.isEmpty());
|
|
|
}
|
|
|
|
|
|
Messenger(int threads, DatagramSocket s) {
|
|
|
mySocket = s;
|
|
|
- acksqueue = new LinkedBlockingQueue<Long>();
|
|
|
- challengeMap = new HashMap<Long, Long>();
|
|
|
- challengeMutex = new HashMap<Long, Semaphore>();
|
|
|
- ackMutex = new HashMap<Long, Semaphore>();
|
|
|
- addrChallengeMap = new HashMap<InetSocketAddress, HashMap<Long, Long>>();
|
|
|
+ ackset = Collections.<Long>newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
|
|
|
+ challengeMap = new ConcurrentHashMap<Long, Long>();
|
|
|
+ challengeMutex = new ConcurrentHashMap<Long, Semaphore>();
|
|
|
+ ackMutex = new ConcurrentHashMap<Long, Semaphore>();
|
|
|
+ addrChallengeMap = new ConcurrentHashMap<InetSocketAddress, ConcurrentHashMap<Long, Long>>();
|
|
|
lastProposedLeader = 0;
|
|
|
lastProposedZxid = 0;
|
|
|
lastEpoch = 0;
|
|
@@ -715,7 +721,7 @@ public class AuthFastLeaderElection implements Election {
|
|
|
for (QuorumServer server : self.getVotingView().values()) {
|
|
|
InetSocketAddress saddr = new InetSocketAddress(server.addr
|
|
|
.getAddress(), port);
|
|
|
- addrChallengeMap.put(saddr, new HashMap<Long, Long>());
|
|
|
+ addrChallengeMap.put(saddr, new ConcurrentHashMap<Long, Long>());
|
|
|
}
|
|
|
|
|
|
Thread t = new Thread(new WorkerReceiver(s, this),
|