|
@@ -18,63 +18,181 @@
|
|
|
|
|
|
package org.apache.zookeeper.server;
|
|
|
|
|
|
-import static org.junit.Assert.*;
|
|
|
+import org.apache.jute.BinaryOutputArchive;
|
|
|
+import org.apache.jute.Record;
|
|
|
+import org.apache.zookeeper.KeeperException;
|
|
|
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
|
|
|
+import org.apache.zookeeper.KeeperException.SessionMovedException;
|
|
|
+import org.apache.zookeeper.MultiTransactionRecord;
|
|
|
+import org.apache.zookeeper.Op;
|
|
|
+import org.apache.zookeeper.PortAssignment;
|
|
|
+import org.apache.zookeeper.ZooDefs.Ids;
|
|
|
+import org.apache.zookeeper.ZooDefs.OpCode;
|
|
|
+import org.apache.zookeeper.data.Id;
|
|
|
+import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
|
|
|
+import org.apache.zookeeper.test.ClientBase;
|
|
|
+import org.apache.zookeeper.txn.ErrorTxn;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.Assert;
|
|
|
+import org.junit.Before;
|
|
|
+import org.junit.Test;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import java.io.ByteArrayOutputStream;
|
|
|
import java.io.File;
|
|
|
+import java.io.IOException;
|
|
|
import java.io.PrintWriter;
|
|
|
import java.nio.ByteBuffer;
|
|
|
+import java.util.Arrays;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
|
|
|
-import org.apache.zookeeper.PortAssignment;
|
|
|
-import org.apache.zookeeper.KeeperException.Code;
|
|
|
-import org.apache.zookeeper.KeeperException.SessionExpiredException;
|
|
|
-import org.apache.zookeeper.KeeperException.SessionMovedException;
|
|
|
-import org.apache.zookeeper.ZooDefs.OpCode;
|
|
|
-import org.apache.zookeeper.server.PrepRequestProcessor;
|
|
|
-import org.apache.zookeeper.server.Request;
|
|
|
-import org.apache.zookeeper.server.RequestProcessor;
|
|
|
-import org.apache.zookeeper.server.ServerCnxnFactory;
|
|
|
-import org.apache.zookeeper.server.SyncRequestProcessor;
|
|
|
-import org.apache.zookeeper.server.ZooKeeperServer;
|
|
|
-import org.apache.zookeeper.test.ClientBase;
|
|
|
-import org.apache.zookeeper.txn.ErrorTxn;
|
|
|
-import org.junit.Assert;
|
|
|
-import org.junit.Test;
|
|
|
-
|
|
|
public class PrepRequestProcessorTest extends ClientBase {
|
|
|
- private static String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
|
|
|
+ private static final Logger LOG = LoggerFactory.getLogger(PrepRequestProcessorTest.class);
|
|
|
private static final int CONNECTION_TIMEOUT = 3000;
|
|
|
- private final CountDownLatch testEnd = new CountDownLatch(1);
|
|
|
+ private static String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
|
|
|
+ private CountDownLatch pLatch;
|
|
|
|
|
|
- @Test
|
|
|
- public void testPRequest() throws Exception {
|
|
|
+ private ZooKeeperServer zks;
|
|
|
+ private ServerCnxnFactory servcnxnf;
|
|
|
+ private PrepRequestProcessor processor;
|
|
|
+ private Request outcome;
|
|
|
+
|
|
|
+ @Before
|
|
|
+ public void setup() throws Exception {
|
|
|
File tmpDir = ClientBase.createTmpDir();
|
|
|
ClientBase.setupTestEnv();
|
|
|
- ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
|
|
|
+ zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
|
|
|
SyncRequestProcessor.setSnapCount(100);
|
|
|
final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
|
|
|
- ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
|
|
|
- f.startup(zks);
|
|
|
+
|
|
|
+ servcnxnf = ServerCnxnFactory.createFactory(PORT, -1);
|
|
|
+ servcnxnf.startup(zks);
|
|
|
Assert.assertTrue("waiting for server being up ",
|
|
|
- ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
|
|
|
- zks.sessionTracker = new MySessionTracker();
|
|
|
- PrepRequestProcessor processor = new PrepRequestProcessor(zks, new MyRequestProcessor());
|
|
|
+ ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
|
|
|
+ zks.sessionTracker = new MySessionTracker();
|
|
|
+ }
|
|
|
+
|
|
|
+ @After
|
|
|
+ public void teardown() throws Exception {
|
|
|
+ if (servcnxnf != null) {
|
|
|
+ servcnxnf.shutdown();
|
|
|
+ }
|
|
|
+ if (zks != null) {
|
|
|
+ zks.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testPRequest() throws Exception {
|
|
|
+ pLatch = new CountDownLatch(1);
|
|
|
+ processor = new PrepRequestProcessor(zks, new MyRequestProcessor());
|
|
|
Request foo = new Request(null, 1l, 1, OpCode.create, ByteBuffer.allocate(3), null);
|
|
|
processor.pRequest(foo);
|
|
|
- testEnd.await(5, java.util.concurrent.TimeUnit.SECONDS);
|
|
|
- f.shutdown();
|
|
|
- zks.shutdown();
|
|
|
+
|
|
|
+ Assert.assertEquals("Request should have marshalling error", new ErrorTxn(KeeperException.Code.MARSHALLINGERROR.intValue()),
|
|
|
+ outcome.getTxn());
|
|
|
+ Assert.assertTrue("request hasn't been processed in chain",
|
|
|
+ pLatch.await(5, java.util.concurrent.TimeUnit.SECONDS));
|
|
|
+ }
|
|
|
+
|
|
|
+ private Request createMultiRequest(List<Op> ops) throws IOException {
|
|
|
+ Record record = new MultiTransactionRecord(ops);
|
|
|
+
|
|
|
+ // encoding
|
|
|
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
|
+ BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
|
|
|
+ record.serialize(boa, "request");
|
|
|
+ baos.close();
|
|
|
+
|
|
|
+ // Id
|
|
|
+ List<Id> ids = Arrays.asList(Ids.ANYONE_ID_UNSAFE);
|
|
|
+
|
|
|
+ return new Request(null, 1l, 0, OpCode.multi, ByteBuffer.wrap(baos.toByteArray()), ids);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void process(List<Op> ops) throws Exception {
|
|
|
+ pLatch = new CountDownLatch(1);
|
|
|
+ processor = new PrepRequestProcessor(zks, new MyRequestProcessor());
|
|
|
+
|
|
|
+ Request req = createMultiRequest(ops);
|
|
|
+
|
|
|
+ processor.pRequest(req);
|
|
|
+ Assert.assertTrue("request hasn't been processed in chain",
|
|
|
+ pLatch.await(5, java.util.concurrent.TimeUnit.SECONDS));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This test checks that a successful multi will change outstanding record
|
|
|
+ * and failed multi shouldn't change outstanding record.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testMultiOutstandingChange() throws Exception {
|
|
|
+ zks.getZKDatabase().dataTree.createNode("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE, 0, 0, 0, 0);
|
|
|
+
|
|
|
+ Assert.assertNull(zks.outstandingChangesForPath.get("/foo"));
|
|
|
+
|
|
|
+ process(Arrays.asList(
|
|
|
+ Op.setData("/foo", new byte[0], -1)));
|
|
|
+
|
|
|
+ ChangeRecord cr = zks.outstandingChangesForPath.get("/foo");
|
|
|
+ Assert.assertNotNull("Change record wasn't set", cr);
|
|
|
+ Assert.assertEquals("Record zxid wasn't set correctly",
|
|
|
+ 1, cr.zxid);
|
|
|
+
|
|
|
+ process(Arrays.asList(
|
|
|
+ Op.delete("/foo", -1)));
|
|
|
+ cr = zks.outstandingChangesForPath.get("/foo");
|
|
|
+ Assert.assertEquals("Record zxid wasn't set correctly",
|
|
|
+ 2, cr.zxid);
|
|
|
+
|
|
|
+
|
|
|
+ // It should fail and shouldn't change outstanding record.
|
|
|
+ process(Arrays.asList(
|
|
|
+ Op.delete("/foo", -1)));
|
|
|
+ cr = zks.outstandingChangesForPath.get("/foo");
|
|
|
+ // zxid should still be previous result because record's not changed.
|
|
|
+ Assert.assertEquals("Record zxid wasn't set correctly",
|
|
|
+ 2, cr.zxid);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * ZOOKEEPER-2052:
|
|
|
+ * This test checks that if a multi operation aborted, and during the multi there is side effect
|
|
|
+ * that changed outstandingChangesForPath, after aborted the side effect should be removed and
|
|
|
+ * everything should be restored correctly.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testMultiRollbackNoLastChange() throws Exception {
|
|
|
+ zks.getZKDatabase().dataTree.createNode("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE, 0, 0, 0, 0);
|
|
|
+ zks.getZKDatabase().dataTree.createNode("/foo/bar", new byte[0], Ids.OPEN_ACL_UNSAFE, 0, 0, 0, 0);
|
|
|
+
|
|
|
+ pLatch = new CountDownLatch(1);
|
|
|
+ processor = new PrepRequestProcessor(zks, new MyRequestProcessor());
|
|
|
+
|
|
|
+ Assert.assertNull(zks.outstandingChangesForPath.get("/foo"));
|
|
|
+
|
|
|
+ // multi record:
|
|
|
+ // set "/foo" => succeed, leave a outstanding change
|
|
|
+ // delete "/foo" => fail, roll back change
|
|
|
+ process(Arrays.asList(
|
|
|
+ Op.setData("/foo", new byte[0], -1),
|
|
|
+ Op.delete("/foo", -1)));
|
|
|
+
|
|
|
+ // aborting multi shouldn't leave any record.
|
|
|
+ Assert.assertNull(zks.outstandingChangesForPath.get("/foo"));
|
|
|
}
|
|
|
-
|
|
|
|
|
|
private class MyRequestProcessor implements RequestProcessor {
|
|
|
@Override
|
|
|
public void processRequest(Request request) {
|
|
|
- Assert.assertEquals("Request should have marshalling error", new ErrorTxn(Code.MARSHALLINGERROR.intValue()), request.getTxn());
|
|
|
- testEnd.countDown();
|
|
|
+ // getting called by PrepRequestProcessor
|
|
|
+ outcome = request;
|
|
|
+ pLatch.countDown();
|
|
|
}
|
|
|
@Override
|
|
|
public void shutdown() {
|