|
@@ -0,0 +1,249 @@
|
|
|
|
+/**
|
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
|
+ * distributed with this work for additional information
|
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
|
+ *
|
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
+ *
|
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
|
+ * limitations under the License.
|
|
|
|
+ */
|
|
|
|
+
|
|
|
|
+package org.apache.zookeeper.server.quorum;
|
|
|
|
+
|
|
|
|
+import java.io.File;
|
|
|
|
+import java.io.IOException;
|
|
|
|
+import java.net.InetSocketAddress;
|
|
|
|
+import java.nio.ByteBuffer;
|
|
|
|
+import java.util.HashMap;
|
|
|
|
+import org.apache.zookeeper.PortAssignment;
|
|
|
|
+import org.apache.zookeeper.ZKTestCase;
|
|
|
|
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
|
|
|
|
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
|
|
|
|
+import org.apache.zookeeper.test.ClientBase;
|
|
|
|
+import org.junit.After;
|
|
|
|
+import org.junit.Assert;
|
|
|
|
+import org.junit.Before;
|
|
|
|
+import org.junit.Test;
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+public class FLEMalformedNotificationMessageTest extends ZKTestCase {
|
|
|
|
+ private static final Logger LOG = LoggerFactory.getLogger(FLEMalformedNotificationMessageTest.class);
|
|
|
|
+ private static final byte[] CONFIG_BYTES = "my very invalid config string".getBytes();
|
|
|
|
+ private static final int CONFIG_BYTES_LENGTH = CONFIG_BYTES.length;
|
|
|
|
+
|
|
|
|
+ int count;
|
|
|
|
+ HashMap<Long, QuorumServer> peers;
|
|
|
|
+ File tmpdir[];
|
|
|
|
+ int port[];
|
|
|
|
+
|
|
|
|
+ QuorumCnxManager mockCnxManager;
|
|
|
|
+ FLETestUtils.LEThread leaderElectionThread;
|
|
|
|
+ QuorumPeer peerRunningLeaderElection;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ @Before
|
|
|
|
+ public void setUp() throws Exception {
|
|
|
|
+ count = 3;
|
|
|
|
+
|
|
|
|
+ peers = new HashMap<>(count);
|
|
|
|
+ tmpdir = new File[count];
|
|
|
|
+ port = new int[count];
|
|
|
|
+
|
|
|
|
+ LOG.info("FLEMalformedNotificationMessageTest: {}, {}", getTestName(), count);
|
|
|
|
+ for (int i = 0; i < count; i++) {
|
|
|
|
+ int clientport = PortAssignment.unique();
|
|
|
|
+ peers.put((long) i,
|
|
|
|
+ new QuorumServer(i,
|
|
|
|
+ new InetSocketAddress(clientport),
|
|
|
|
+ new InetSocketAddress(PortAssignment.unique())));
|
|
|
|
+ tmpdir[i] = ClientBase.createTmpDir();
|
|
|
|
+ port[i] = clientport;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * Start server 0
|
|
|
|
+ */
|
|
|
|
+ peerRunningLeaderElection = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2, 2);
|
|
|
|
+ peerRunningLeaderElection.startLeaderElection();
|
|
|
|
+ leaderElectionThread = new FLETestUtils.LEThread(peerRunningLeaderElection, 0);
|
|
|
|
+ leaderElectionThread.start();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ @After
|
|
|
|
+ public void tearDown() throws Exception {
|
|
|
|
+ peerRunningLeaderElection.shutdown();
|
|
|
|
+ mockCnxManager.halt();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testTooShortPartialNotificationMessage() throws Exception {
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * Start mock server 1, send a message too short to be compatible with any protocol version
|
|
|
|
+ * This simulates the case when only some parts of the whole message is received.
|
|
|
|
+ */
|
|
|
|
+ startMockServer(1);
|
|
|
|
+ byte requestBytes[] = new byte[12];
|
|
|
|
+ ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
|
|
|
|
+ requestBuffer.clear();
|
|
|
|
+ requestBuffer.putInt(ServerState.LOOKING.ordinal()); // state
|
|
|
|
+ requestBuffer.putLong(0); // leader
|
|
|
|
+ mockCnxManager.toSend(0L, requestBuffer);
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * Assert that the message receiver thread in leader election is still healthy:
|
|
|
|
+ * we are sending valid votes and waiting for the leader election to be finished.
|
|
|
|
+ */
|
|
|
|
+ sendValidNotifications(1, 0);
|
|
|
|
+ leaderElectionThread.join(5000);
|
|
|
|
+ if (leaderElectionThread.isAlive()) {
|
|
|
|
+ Assert.fail("Leader election thread didn't join, something went wrong.");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testNotificationMessageWithNegativeConfigLength() throws Exception {
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * Start mock server 1, send a message with negative configLength field
|
|
|
|
+ */
|
|
|
|
+ startMockServer(1);
|
|
|
|
+ byte requestBytes[] = new byte[48];
|
|
|
|
+ ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
|
|
|
|
+ requestBuffer.clear();
|
|
|
|
+ requestBuffer.putInt(ServerState.LOOKING.ordinal()); // state
|
|
|
|
+ requestBuffer.putLong(0); // leader
|
|
|
|
+ requestBuffer.putLong(0); // zxid
|
|
|
|
+ requestBuffer.putLong(0); // electionEpoch
|
|
|
|
+ requestBuffer.putLong(0); // epoch
|
|
|
|
+ requestBuffer.putInt(FastLeaderElection.Notification.CURRENTVERSION); // version
|
|
|
|
+ requestBuffer.putInt(-123); // configData.length
|
|
|
|
+ mockCnxManager.toSend(0L, requestBuffer);
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * Assert that the message receiver thread in leader election is still healthy:
|
|
|
|
+ * we are sending valid votes and waiting for the leader election to be finished.
|
|
|
|
+ */
|
|
|
|
+ sendValidNotifications(1, 0);
|
|
|
|
+ leaderElectionThread.join(5000);
|
|
|
|
+ if (leaderElectionThread.isAlive()) {
|
|
|
|
+ Assert.fail("Leader election thread didn't join, something went wrong.");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testNotificationMessageWithInvalidConfigLength() throws Exception {
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * Start mock server 1, send a message with an invalid configLength field
|
|
|
|
+ * (instead of sending CONFIG_BYTES_LENGTH, we send 10000)
|
|
|
|
+ */
|
|
|
|
+ startMockServer(1);
|
|
|
|
+ byte requestBytes[] = new byte[48 + CONFIG_BYTES_LENGTH];
|
|
|
|
+ ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
|
|
|
|
+ requestBuffer.clear();
|
|
|
|
+ requestBuffer.putInt(ServerState.LOOKING.ordinal()); // state
|
|
|
|
+ requestBuffer.putLong(0); // leader
|
|
|
|
+ requestBuffer.putLong(0); // zxid
|
|
|
|
+ requestBuffer.putLong(0); // electionEpoch
|
|
|
|
+ requestBuffer.putLong(0); // epoch
|
|
|
|
+ requestBuffer.putInt(FastLeaderElection.Notification.CURRENTVERSION); // version
|
|
|
|
+ requestBuffer.putInt(10000); // configData.length
|
|
|
|
+ requestBuffer.put(CONFIG_BYTES); // configData
|
|
|
|
+ mockCnxManager.toSend(0L, requestBuffer);
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * Assert that the message receiver thread in leader election is still healthy:
|
|
|
|
+ * we are sending valid votes and waiting for the leader election to be finished.
|
|
|
|
+ */
|
|
|
|
+ sendValidNotifications(1, 0);
|
|
|
|
+ leaderElectionThread.join(5000);
|
|
|
|
+ if (leaderElectionThread.isAlive()) {
|
|
|
|
+ Assert.fail("Leader election thread didn't join, something went wrong.");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testNotificationMessageWithInvalidConfig() throws Exception {
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * Start mock server 1, send a message with an invalid config field
|
|
|
|
+ * (the receiver should not be able to parse the config part of the message)
|
|
|
|
+ */
|
|
|
|
+ startMockServer(1);
|
|
|
|
+ ByteBuffer requestBuffer = FastLeaderElection.buildMsg(ServerState.LOOKING.ordinal(), 1, 0, 0, 0, CONFIG_BYTES);
|
|
|
|
+ mockCnxManager.toSend(0L, requestBuffer);
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * Assert that the message receiver thread in leader election is still healthy:
|
|
|
|
+ * we are sending valid votes and waiting for the leader election to be finished.
|
|
|
|
+ */
|
|
|
|
+ sendValidNotifications(1, 0);
|
|
|
|
+ leaderElectionThread.join(5000);
|
|
|
|
+ if (leaderElectionThread.isAlive()) {
|
|
|
|
+ Assert.fail("Leader election thread didn't join, something went wrong.");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testNotificationMessageWithBadProtocol() throws Exception {
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * Start mock server 1, send an invalid 30 bytes long message
|
|
|
|
+ * (the receiver should not be able to parse the message and should skip it)
|
|
|
|
+ * This simulates the case when only some parts of the whole message is received.
|
|
|
|
+ */
|
|
|
|
+ startMockServer(1);
|
|
|
|
+ byte requestBytes[] = new byte[30];
|
|
|
|
+ ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
|
|
|
|
+ requestBuffer.clear();
|
|
|
|
+ requestBuffer.putInt(ServerState.LOOKING.ordinal()); // state
|
|
|
|
+ requestBuffer.putLong(1); // leader
|
|
|
|
+ requestBuffer.putLong(0); // zxid
|
|
|
|
+ requestBuffer.putLong(0); // electionEpoch
|
|
|
|
+ requestBuffer.putShort((short) 0); // this is the first two bytes of a proper
|
|
|
|
+ // 8 bytes Long we should send here
|
|
|
|
+ mockCnxManager.toSend(0L, requestBuffer);
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * Assert that the message receiver thread in leader election is still healthy:
|
|
|
|
+ * we are sending valid votes and waiting for the leader election to be finished.
|
|
|
|
+ */
|
|
|
|
+ sendValidNotifications(1, 0);
|
|
|
|
+ leaderElectionThread.join(5000);
|
|
|
|
+ if (leaderElectionThread.isAlive()) {
|
|
|
|
+ Assert.fail("Leader election thread didn't join, something went wrong.");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ void startMockServer(int sid) throws IOException {
|
|
|
|
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid], port[sid], 3, sid, 1000, 2, 2, 2);
|
|
|
|
+ mockCnxManager = peer.createCnxnManager();
|
|
|
|
+ mockCnxManager.listener.start();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ void sendValidNotifications(int fromSid, int toSid) throws InterruptedException {
|
|
|
|
+ mockCnxManager.toSend((long) toSid, FLETestUtils.createMsg(ServerState.LOOKING.ordinal(), fromSid, 0, 0));
|
|
|
|
+ mockCnxManager.recvQueue.take();
|
|
|
|
+ mockCnxManager.toSend((long) toSid, FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), toSid, 0, 0));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+}
|