|
@@ -37,6 +37,7 @@ import org.apache.zookeeper.ZKTestCase;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumCnxManager;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer;
|
|
|
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
|
|
|
import org.junit.Assert;
|
|
@@ -274,6 +275,60 @@ public class CnxManagerTest extends ZKTestCase {
|
|
|
Assert.assertFalse(cnxManager.listener.isAlive());
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Tests a bug in QuorumCnxManager that causes a NPE when a 3.4.6
|
|
|
+ * observer connects to a 3.5.0 server.
|
|
|
+ * {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1789}
|
|
|
+ *
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testCnxManagerNPE() throws Exception {
|
|
|
+ // the connecting peer (id = 2) is a 3.4.6 observer
|
|
|
+ peers.get(2L).type = LearnerType.OBSERVER;
|
|
|
+ QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1],
|
|
|
+ peerClientPort[1], 3, 1, 1000, 2, 2);
|
|
|
+ QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
|
|
|
+ QuorumCnxManager.Listener listener = cnxManager.listener;
|
|
|
+ if (listener != null) {
|
|
|
+ listener.start();
|
|
|
+ } else {
|
|
|
+ LOG.error("Null listener when initializing cnx manager");
|
|
|
+ }
|
|
|
+ int port = peers.get(peer.getId()).electionAddr.getPort();
|
|
|
+ LOG.info("Election port: " + port);
|
|
|
+
|
|
|
+ Thread.sleep(1000);
|
|
|
+
|
|
|
+ SocketChannel sc = SocketChannel.open();
|
|
|
+ sc.socket().connect(peers.get(1L).electionAddr, 5000);
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Write id (3.4.6 protocol). This previously caused a NPE in
|
|
|
+ * QuorumCnxManager.
|
|
|
+ */
|
|
|
+ byte[] msgBytes = new byte[8];
|
|
|
+ ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
|
|
|
+ msgBuffer.putLong(2L);
|
|
|
+ msgBuffer.position(0);
|
|
|
+ sc.write(msgBuffer);
|
|
|
+
|
|
|
+ msgBuffer = ByteBuffer.wrap(new byte[8]);
|
|
|
+ // write length of message
|
|
|
+ msgBuffer.putInt(4);
|
|
|
+ // write message
|
|
|
+ msgBuffer.putInt(5);
|
|
|
+ msgBuffer.position(0);
|
|
|
+ sc.write(msgBuffer);
|
|
|
+
|
|
|
+ Message m = cnxManager.pollRecvQueue(1000, TimeUnit.MILLISECONDS);
|
|
|
+ Assert.assertNotNull(m);
|
|
|
+
|
|
|
+ peer.shutdown();
|
|
|
+ cnxManager.halt();
|
|
|
+ Assert.assertFalse(cnxManager.listener.isAlive());
|
|
|
+ }
|
|
|
+
|
|
|
/*
|
|
|
* Test if a receiveConnection is able to timeout on socket errors
|
|
|
*/
|