|
@@ -28,9 +28,7 @@ import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Random;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
import org.apache.jute.BinaryOutputArchive;
|
|
@@ -84,18 +82,13 @@ public class CommitProcessorTest extends ZKTestCase {
|
|
|
File tmpDir;
|
|
|
ArrayList<TestClientThread> testClients = new ArrayList<TestClientThread>();
|
|
|
CommitProcessor commitProcessor;
|
|
|
- DelayRequestProcessor delayProcessor;
|
|
|
|
|
|
public void setUp(int numCommitThreads, int numClientThreads, int writePercent) throws Exception {
|
|
|
- setUp(numCommitThreads, numClientThreads, writePercent, false);
|
|
|
- }
|
|
|
-
|
|
|
- public void setUp(int numCommitThreads, int numClientThreads, int writePercent, boolean withDelayProcessor) throws Exception {
|
|
|
stopped = false;
|
|
|
System.setProperty(CommitProcessor.ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS, Integer.toString(numCommitThreads));
|
|
|
tmpDir = ClientBase.createTmpDir();
|
|
|
ClientBase.setupTestEnv();
|
|
|
- zks = new TestZooKeeperServer(tmpDir, tmpDir, 4000, withDelayProcessor);
|
|
|
+ zks = new TestZooKeeperServer(tmpDir, tmpDir, 4000);
|
|
|
zks.startup();
|
|
|
for (int i = 0; i < numClientThreads; ++i) {
|
|
|
TestClientThread client = new TestClientThread(writePercent);
|
|
@@ -218,23 +211,6 @@ public class CommitProcessorTest extends ZKTestCase {
|
|
|
assertTrue("Write requests processed", processedWriteRequests.get() == numClients);
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testWaitingForWriteToFinishBeforeShutdown() throws Exception {
|
|
|
- setUp(1, 0, 0, true);
|
|
|
-
|
|
|
- // send a single write request
|
|
|
- TestClientThread client = new TestClientThread(0);
|
|
|
- client.sendWriteRequest();
|
|
|
-
|
|
|
- // wait for request being committed
|
|
|
- delayProcessor.waitRequestProcessing();
|
|
|
-
|
|
|
- zks.shutdown();
|
|
|
-
|
|
|
- // Make sure we've finished the in-flight request before shutdown returns
|
|
|
- assertFalse(commitProcessor.isAlive());
|
|
|
- }
|
|
|
-
|
|
|
@Test
|
|
|
public void testNoCommitWorkersMixedWorkload() throws Exception {
|
|
|
int numClients = 10;
|
|
@@ -311,15 +287,8 @@ public class CommitProcessorTest extends ZKTestCase {
|
|
|
|
|
|
private class TestZooKeeperServer extends ZooKeeperServer {
|
|
|
|
|
|
- final boolean withDelayProcessor;
|
|
|
-
|
|
|
public TestZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException {
|
|
|
- this(snapDir, logDir, tickTime, false);
|
|
|
- }
|
|
|
-
|
|
|
- public TestZooKeeperServer(File snapDir, File logDir, int tickTime, boolean withDelayProcessor) throws IOException {
|
|
|
super(snapDir, logDir, tickTime);
|
|
|
- this.withDelayProcessor = withDelayProcessor;
|
|
|
}
|
|
|
|
|
|
public PrepRequestProcessor getFirstProcessor() {
|
|
@@ -334,12 +303,7 @@ public class CommitProcessorTest extends ZKTestCase {
|
|
|
// ValidateProcessor is set up in a similar fashion to ToBeApplied
|
|
|
// processor, so it can do pre/post validating of requests
|
|
|
ValidateProcessor validateProcessor = new ValidateProcessor(finalProcessor);
|
|
|
- if (withDelayProcessor) {
|
|
|
- delayProcessor = new DelayRequestProcessor(validateProcessor);
|
|
|
- commitProcessor = new CommitProcessor(delayProcessor, "1", true, null);
|
|
|
- } else {
|
|
|
- commitProcessor = new CommitProcessor(validateProcessor, "1", true, null);
|
|
|
- }
|
|
|
+ commitProcessor = new CommitProcessor(validateProcessor, "1", true, null);
|
|
|
validateProcessor.setCommitProcessor(commitProcessor);
|
|
|
commitProcessor.start();
|
|
|
MockProposalRequestProcessor proposalProcessor = new MockProposalRequestProcessor(commitProcessor);
|
|
@@ -350,46 +314,6 @@ public class CommitProcessorTest extends ZKTestCase {
|
|
|
|
|
|
}
|
|
|
|
|
|
- private class DelayRequestProcessor implements RequestProcessor {
|
|
|
- // delay 1s for each request
|
|
|
- static final int DEFAULT_DELAY = 1000;
|
|
|
- RequestProcessor nextProcessor;
|
|
|
- CountDownLatch waitingProcessRequestBeingCalled;
|
|
|
-
|
|
|
- public DelayRequestProcessor(RequestProcessor nextProcessor) {
|
|
|
- this.nextProcessor = nextProcessor;
|
|
|
- this.waitingProcessRequestBeingCalled = new CountDownLatch(1);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void processRequest(Request request) throws RequestProcessorException {
|
|
|
- try {
|
|
|
- this.waitingProcessRequestBeingCalled.countDown();
|
|
|
- LOG.info("Sleeping {} ms for request {}", DEFAULT_DELAY, request);
|
|
|
- Thread.sleep(DEFAULT_DELAY);
|
|
|
- } catch (InterruptedException e) { /* ignore */ }
|
|
|
- nextProcessor.processRequest(request);
|
|
|
- }
|
|
|
-
|
|
|
- public void waitRequestProcessing() {
|
|
|
- try {
|
|
|
- if (!waitingProcessRequestBeingCalled.await(3000, TimeUnit.MILLISECONDS)) {
|
|
|
- LOG.info("Did not see request processing in 3s");
|
|
|
- }
|
|
|
- } catch (InterruptedException e) {
|
|
|
- LOG.info("Interrupted when waiting for processRequest being called");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void shutdown() {
|
|
|
- LOG.info("shutdown DelayRequestProcessor");
|
|
|
- if (nextProcessor != null) {
|
|
|
- nextProcessor.shutdown();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
private class MockProposalRequestProcessor extends Thread implements RequestProcessor {
|
|
|
|
|
|
private final CommitProcessor commitProcessor;
|