|
@@ -108,6 +108,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
|
|
|
|
|
|
@Override
|
|
|
public void notifyStopping(String threadName, int errorCode) {
|
|
|
+ Assert.fail("Commit processor crashed " + errorCode);
|
|
|
}
|
|
|
});
|
|
|
}
|
|
@@ -376,4 +377,129 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
|
|
|
!processedRequests.contains(r));
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * In the following test, we verify that we can handle the case that we got a commit
|
|
|
+ * of a request we never seen since the session that we just established. This can happen
|
|
|
+ * when a session is just established and there is request waiting to be committed in the
|
|
|
+ * session queue but it sees a commit for a request that belongs to the previous connection.
|
|
|
+ */
|
|
|
+ @Test(timeout = 5000)
|
|
|
+ public void noCrashOnCommittedRequestsOfUnseenRequestTest() throws Exception {
|
|
|
+ final String path = "/noCrash/OnCommittedRequests/OfUnseenRequestTest";
|
|
|
+ final int numberofReads = 10;
|
|
|
+ final int sessionid = 0x123456;
|
|
|
+ final int firstCXid = 0x100;
|
|
|
+ int readReqId = firstCXid;
|
|
|
+ processor.stoppedMainLoop = true;
|
|
|
+ HashSet<Request> localRequests = new HashSet<Request>();
|
|
|
+ // queue the blocking write request to queuedRequests
|
|
|
+ Request firstCommittedReq = newRequest(
|
|
|
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
|
|
|
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
|
|
|
+ OpCode.create, sessionid, readReqId++);
|
|
|
+ processor.queuedRequests.add(firstCommittedReq);
|
|
|
+ localRequests.add(firstCommittedReq);
|
|
|
+
|
|
|
+ // queue read requests to queuedRequests
|
|
|
+ for (; readReqId <= numberofReads+firstCXid; ++readReqId) {
|
|
|
+ Request readReq = newRequest(new GetDataRequest(path, false),
|
|
|
+ OpCode.getData, sessionid, readReqId);
|
|
|
+ processor.queuedRequests.add(readReq);
|
|
|
+ localRequests.add(readReq);
|
|
|
+ }
|
|
|
+
|
|
|
+ //run once
|
|
|
+ Assert.assertTrue(processor.queuedRequests.containsAll(localRequests));
|
|
|
+ processor.initThreads(defaultSizeOfThreadPool);
|
|
|
+ processor.run();
|
|
|
+ Thread.sleep(1000);
|
|
|
+
|
|
|
+ //We verify that the processor is waiting for the commit
|
|
|
+ Assert.assertTrue(processedRequests.isEmpty());
|
|
|
+
|
|
|
+ // We add a commit that belongs to the same session but with smaller cxid,
|
|
|
+ // i.e., commit of an update from previous connection of this session.
|
|
|
+ Request preSessionCommittedReq = newRequest(
|
|
|
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
|
|
|
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
|
|
|
+ OpCode.create, sessionid, firstCXid - 2);
|
|
|
+ processor.committedRequests.add(preSessionCommittedReq);
|
|
|
+ processor.committedRequests.add(firstCommittedReq);
|
|
|
+ processor.run();
|
|
|
+ Thread.sleep(1000);
|
|
|
+
|
|
|
+ //We verify that the commit processor processed the old commit prior to the newer messages
|
|
|
+ Assert.assertTrue(processedRequests.peek() == preSessionCommittedReq);
|
|
|
+
|
|
|
+ processor.run();
|
|
|
+ Thread.sleep(1000);
|
|
|
+
|
|
|
+ //We verify that the commit processor handle all messages.
|
|
|
+ Assert.assertTrue(processedRequests.containsAll(localRequests));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * In the following test, we verify if we handle the case in which we get a commit
|
|
|
+ * for a request that has higher Cxid than the one we are waiting. This can happen
|
|
|
+ * when a session connection is lost but there is a request waiting to be committed in the
|
|
|
+ * session queue. However, since the session has moved, new requests can get to
|
|
|
+ * the leader out of order. Hence, the commits can also arrive "out of order" w.r.t. cxid.
|
|
|
+ * We should commit the requests according to the order we receive from the leader, i.e., wait for the relevant commit.
|
|
|
+ */
|
|
|
+ @Test(timeout = 5000)
|
|
|
+ public void noCrashOnOutofOrderCommittedRequestTest() throws Exception {
|
|
|
+ final String path = "/noCrash/OnCommittedRequests/OfUnSeenRequestTest";
|
|
|
+ final int sessionid = 0x123456;
|
|
|
+ final int lastCXid = 0x100;
|
|
|
+ final int numberofReads = 10;
|
|
|
+ int readReqId = lastCXid;
|
|
|
+ processor.stoppedMainLoop = true;
|
|
|
+ HashSet<Request> localRequests = new HashSet<Request>();
|
|
|
+
|
|
|
+ // queue the blocking write request to queuedRequests
|
|
|
+ Request orphanCommittedReq = newRequest(
|
|
|
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
|
|
|
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
|
|
|
+ OpCode.create, sessionid, lastCXid);
|
|
|
+ processor.queuedRequests.add(orphanCommittedReq);
|
|
|
+ localRequests.add(orphanCommittedReq);
|
|
|
+
|
|
|
+ // queue read requests to queuedRequests
|
|
|
+ for (; readReqId <= numberofReads+lastCXid; ++readReqId) {
|
|
|
+ Request readReq = newRequest(new GetDataRequest(path, false),
|
|
|
+ OpCode.getData, sessionid, readReqId);
|
|
|
+ processor.queuedRequests.add(readReq);
|
|
|
+ localRequests.add(readReq);
|
|
|
+ }
|
|
|
+
|
|
|
+ //run once
|
|
|
+ processor.initThreads(defaultSizeOfThreadPool);
|
|
|
+ processor.run();
|
|
|
+ Thread.sleep(1000);
|
|
|
+
|
|
|
+ //We verify that the processor is waiting for the commit
|
|
|
+ Assert.assertTrue(processedRequests.isEmpty());
|
|
|
+
|
|
|
+ // We add a commit that belongs to the same session but with larger cxid,
|
|
|
+ // i.e., commit of an update from the next connection of this session.
|
|
|
+ Request otherSessionCommittedReq = newRequest(
|
|
|
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
|
|
|
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
|
|
|
+ OpCode.create, sessionid, lastCXid+10);
|
|
|
+ processor.committedRequests.add(otherSessionCommittedReq);
|
|
|
+ processor.committedRequests.add(orphanCommittedReq);
|
|
|
+ processor.run();
|
|
|
+ Thread.sleep(1000);
|
|
|
+
|
|
|
+ //We verify that the commit processor processed the old commit prior to the newer messages
|
|
|
+ Assert.assertTrue(processedRequests.size() == 1);
|
|
|
+ Assert.assertTrue(processedRequests.contains(otherSessionCommittedReq));
|
|
|
+
|
|
|
+ processor.run();
|
|
|
+ Thread.sleep(1000);
|
|
|
+
|
|
|
+ //We verify that the commit processor handle all messages.
|
|
|
+ Assert.assertTrue(processedRequests.containsAll(localRequests));
|
|
|
+ }
|
|
|
}
|