|
@@ -82,7 +82,17 @@ public class QuorumCnxManager {
|
|
*/
|
|
*/
|
|
|
|
|
|
private long observerCounter = -1;
|
|
private long observerCounter = -1;
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * Protocol identifier used among peers
|
|
|
|
+ */
|
|
|
|
+ public static final long PROTOCOL_VERSION = -65536L;
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * Max buffer size to be read from the network.
|
|
|
|
+ */
|
|
|
|
+ static public final int maxBuffer = 2048;
|
|
|
|
+
|
|
/*
|
|
/*
|
|
* Connection time out value in milliseconds
|
|
* Connection time out value in milliseconds
|
|
*/
|
|
*/
|
|
@@ -136,6 +146,72 @@ public class QuorumCnxManager {
|
|
long sid;
|
|
long sid;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /*
|
|
|
|
+ * This class parses the initial identification sent out by peers with their
|
|
|
|
+ * sid & hostname.
|
|
|
|
+ */
|
|
|
|
+ static public class InitialMessage {
|
|
|
|
+ public Long sid;
|
|
|
|
+ public InetSocketAddress electionAddr;
|
|
|
|
+
|
|
|
|
+ InitialMessage(Long sid, InetSocketAddress address) {
|
|
|
|
+ this.sid = sid;
|
|
|
|
+ this.electionAddr = address;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @SuppressWarnings("serial")
|
|
|
|
+ public static class InitialMessageException extends Exception {
|
|
|
|
+ InitialMessageException(String message, Object... args) {
|
|
|
|
+ super(String.format(message, args));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ static public InitialMessage parse(Long protocolVersion, DataInputStream din)
|
|
|
|
+ throws InitialMessageException, IOException {
|
|
|
|
+ Long sid;
|
|
|
|
+
|
|
|
|
+ if (protocolVersion != PROTOCOL_VERSION) {
|
|
|
|
+ throw new InitialMessageException(
|
|
|
|
+ "Got unrecognized protocol version %s", protocolVersion);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ sid = din.readLong();
|
|
|
|
+
|
|
|
|
+ int remaining = din.readInt();
|
|
|
|
+ if (remaining <= 0 || remaining > maxBuffer) {
|
|
|
|
+ throw new InitialMessageException(
|
|
|
|
+ "Unreasonable buffer length: %s", remaining);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ byte[] b = new byte[remaining];
|
|
|
|
+ int num_read = din.read(b);
|
|
|
|
+
|
|
|
|
+ if (num_read != remaining) {
|
|
|
|
+ throw new InitialMessageException(
|
|
|
|
+ "Read only %s bytes out of %s sent by server %s",
|
|
|
|
+ num_read, remaining, sid);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // FIXME: IPv6 is not supported. Using something like Guava's HostAndPort
|
|
|
|
+ // parser would be good.
|
|
|
|
+ String addr = new String(b);
|
|
|
|
+ String[] host_port = addr.split(":");
|
|
|
|
+
|
|
|
|
+ if (host_port.length != 2) {
|
|
|
|
+ throw new InitialMessageException("Badly formed address: %s", addr);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ int port;
|
|
|
|
+ try {
|
|
|
|
+ port = Integer.parseInt(host_port[1]);
|
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
|
+ throw new InitialMessageException("Bad port number: %s", host_port[1]);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return new InitialMessage(sid, new InetSocketAddress(host_port[0], port));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
public QuorumCnxManager(QuorumPeer self) {
|
|
public QuorumCnxManager(QuorumPeer self) {
|
|
this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
|
|
this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
|
|
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
|
|
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
|
|
@@ -179,7 +255,7 @@ public class QuorumCnxManager {
|
|
// Sending id and challenge
|
|
// Sending id and challenge
|
|
dout = new DataOutputStream(sock.getOutputStream());
|
|
dout = new DataOutputStream(sock.getOutputStream());
|
|
// represents protocol version (in other words - message type)
|
|
// represents protocol version (in other words - message type)
|
|
- dout.writeLong(0xffff0000);
|
|
|
|
|
|
+ dout.writeLong(PROTOCOL_VERSION);
|
|
dout.writeLong(self.getId());
|
|
dout.writeLong(self.getId());
|
|
String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort();
|
|
String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort();
|
|
byte[] addr_bytes = addr.getBytes();
|
|
byte[] addr_bytes = addr.getBytes();
|
|
@@ -229,31 +305,28 @@ public class QuorumCnxManager {
|
|
* possible long value to lose the challenge.
|
|
* possible long value to lose the challenge.
|
|
*
|
|
*
|
|
*/
|
|
*/
|
|
- public boolean receiveConnection(Socket sock) {
|
|
|
|
|
|
+ public void receiveConnection(Socket sock) {
|
|
Long sid = null, protocolVersion = null;
|
|
Long sid = null, protocolVersion = null;
|
|
InetSocketAddress electionAddr = null;
|
|
InetSocketAddress electionAddr = null;
|
|
|
|
+
|
|
try {
|
|
try {
|
|
DataInputStream din = new DataInputStream(sock.getInputStream());
|
|
DataInputStream din = new DataInputStream(sock.getInputStream());
|
|
|
|
+
|
|
protocolVersion = din.readLong();
|
|
protocolVersion = din.readLong();
|
|
if (protocolVersion >= 0) { // this is a server id and not a protocol version
|
|
if (protocolVersion >= 0) { // this is a server id and not a protocol version
|
|
sid = protocolVersion;
|
|
sid = protocolVersion;
|
|
} else {
|
|
} else {
|
|
- sid = din.readLong();
|
|
|
|
- int num_remaining_bytes = din.readInt();
|
|
|
|
- byte[] b = new byte[num_remaining_bytes];
|
|
|
|
- int num_read = din.read(b);
|
|
|
|
- if (num_read == num_remaining_bytes) {
|
|
|
|
- if (protocolVersion == 0xffff0000) {
|
|
|
|
- String addr = new String(b);
|
|
|
|
- String[] host_port = addr.split(":");
|
|
|
|
- electionAddr = new InetSocketAddress(host_port[0], Integer.parseInt(host_port[1]));
|
|
|
|
- } else {
|
|
|
|
- LOG.error("Got urecognized protocol version " + protocolVersion + " from " + sid);
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
|
|
|
|
|
|
+ try {
|
|
|
|
+ InitialMessage init = InitialMessage.parse(protocolVersion, din);
|
|
|
|
+ sid = init.sid;
|
|
|
|
+ electionAddr = init.electionAddr;
|
|
|
|
+ } catch (InitialMessage.InitialMessageException ex) {
|
|
|
|
+ LOG.error(ex.toString());
|
|
|
|
+ closeSocket(sock);
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
if (sid == QuorumPeer.OBSERVER_ID) {
|
|
if (sid == QuorumPeer.OBSERVER_ID) {
|
|
/*
|
|
/*
|
|
* Choose identifier at random. We need a value to identify
|
|
* Choose identifier at random. We need a value to identify
|
|
@@ -261,12 +334,12 @@ public class QuorumCnxManager {
|
|
*/
|
|
*/
|
|
|
|
|
|
sid = observerCounter--;
|
|
sid = observerCounter--;
|
|
- LOG.info("Setting arbitrary identifier to observer: " + sid);
|
|
|
|
|
|
+ LOG.info("Setting arbitrary identifier to observer: {}", sid);
|
|
}
|
|
}
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
closeSocket(sock);
|
|
closeSocket(sock);
|
|
- LOG.warn("Exception reading or writing challenge: " + e.toString());
|
|
|
|
- return false;
|
|
|
|
|
|
+ LOG.warn("Exception reading or writing challenge: {}", e.toString());
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
|
|
|
|
//If wins the challenge, then close the new connection.
|
|
//If wins the challenge, then close the new connection.
|
|
@@ -284,7 +357,7 @@ public class QuorumCnxManager {
|
|
/*
|
|
/*
|
|
* Now we start a new connection
|
|
* Now we start a new connection
|
|
*/
|
|
*/
|
|
- LOG.debug("Create new connection to server: " + sid);
|
|
|
|
|
|
+ LOG.debug("Create new connection to server: {}", sid);
|
|
closeSocket(sock);
|
|
closeSocket(sock);
|
|
|
|
|
|
if (electionAddr != null) {
|
|
if (electionAddr != null) {
|
|
@@ -293,28 +366,25 @@ public class QuorumCnxManager {
|
|
connectOne(sid);
|
|
connectOne(sid);
|
|
}
|
|
}
|
|
|
|
|
|
- // Otherwise start worker threads to receive data.
|
|
|
|
- } else {
|
|
|
|
|
|
+ } else { // Otherwise start worker threads to receive data.
|
|
SendWorker sw = new SendWorker(sock, sid);
|
|
SendWorker sw = new SendWorker(sock, sid);
|
|
RecvWorker rw = new RecvWorker(sock, sid, sw);
|
|
RecvWorker rw = new RecvWorker(sock, sid, sw);
|
|
sw.setRecv(rw);
|
|
sw.setRecv(rw);
|
|
|
|
|
|
SendWorker vsw = senderWorkerMap.get(sid);
|
|
SendWorker vsw = senderWorkerMap.get(sid);
|
|
|
|
|
|
- if(vsw != null)
|
|
|
|
|
|
+ if (vsw != null) {
|
|
vsw.finish();
|
|
vsw.finish();
|
|
-
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
senderWorkerMap.put(sid, sw);
|
|
senderWorkerMap.put(sid, sw);
|
|
-
|
|
|
|
- queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
|
|
|
|
- SEND_CAPACITY));
|
|
|
|
|
|
+
|
|
|
|
+ queueSendMap.putIfAbsent(sid,
|
|
|
|
+ new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
|
|
|
|
|
|
sw.start();
|
|
sw.start();
|
|
rw.start();
|
|
rw.start();
|
|
-
|
|
|
|
- return true;
|
|
|
|
}
|
|
}
|
|
- return false;
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|