Explorar o código

ZOOKEEPER-3598: Fix potential data inconsistency issue due to CommitProcessor not gracefully shutdown

Note: use exit code 16 for SHUTDOWN_UNGRACEFULLY, since internally we've already using 15 for other exit code, which will be upstreamed later.

Author: Fangmin Lyu <fangmin@apache.org>

Reviewers: Enrico Olivelli <eolivelli@apache.org>, Michael Han <hanm@apache.org>

Closes #1130 from lvfangmin/ZOOKEEPER-3598
Fangmin Lyu %!s(int64=5) %!d(string=hai) anos
pai
achega
79f99af818

+ 4 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/ExitCode.java

@@ -48,7 +48,10 @@ public enum ExitCode {
     QUORUM_PACKET_ERROR(13),
 
     /** Unable to bind to the quorum (election) port after multiple retry */
-    UNABLE_TO_BIND_QUORUM_PORT(14);
+    UNABLE_TO_BIND_QUORUM_PORT(14),
+
+    /** Failed to shutdown the request processor pipeline gracefully **/
+    SHUTDOWN_UNGRACEFULLY(16);
 
     private final int value;
 

+ 15 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java

@@ -29,6 +29,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.server.ExitCode;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.ServerMetrics;
@@ -621,6 +622,20 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements RequestP
             workerPool.join(workerShutdownTimeoutMS);
         }
 
+        try {
+            this.join(workerShutdownTimeoutMS);
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted while waiting for CommitProcessor to finish");
+            Thread.currentThread().interrupt();
+        }
+
+        if (this.isAlive()) {
+            LOG.warn("CommitProcessor does not shutdown gracefully after "
+                    + "waiting for {} ms, exit to avoid potential "
+                    + "inconsistency issue", workerShutdownTimeoutMS);
+            System.exit(ExitCode.SHUTDOWN_UNGRACEFULLY.getValue());
+        }
+
         if (nextProcessor != null) {
             nextProcessor.shutdown();
         }

+ 78 - 2
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java

@@ -28,7 +28,9 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.jute.BinaryOutputArchive;
@@ -82,13 +84,18 @@ public class CommitProcessorTest extends ZKTestCase {
     File tmpDir;
     ArrayList<TestClientThread> testClients = new ArrayList<TestClientThread>();
     CommitProcessor commitProcessor;
+    DelayRequestProcessor delayProcessor;
 
     public void setUp(int numCommitThreads, int numClientThreads, int writePercent) throws Exception {
+        setUp(numCommitThreads, numClientThreads, writePercent, false);
+    }
+
+    public void setUp(int numCommitThreads, int numClientThreads, int writePercent, boolean withDelayProcessor) throws Exception {
         stopped = false;
         System.setProperty(CommitProcessor.ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS, Integer.toString(numCommitThreads));
         tmpDir = ClientBase.createTmpDir();
         ClientBase.setupTestEnv();
-        zks = new TestZooKeeperServer(tmpDir, tmpDir, 4000);
+        zks = new TestZooKeeperServer(tmpDir, tmpDir, 4000, withDelayProcessor);
         zks.startup();
         for (int i = 0; i < numClientThreads; ++i) {
             TestClientThread client = new TestClientThread(writePercent);
@@ -211,6 +218,23 @@ public class CommitProcessorTest extends ZKTestCase {
         assertTrue("Write requests processed", processedWriteRequests.get() == numClients);
     }
 
+    @Test
+    public void testWaitingForWriteToFinishBeforeShutdown() throws Exception {
+        setUp(1, 0, 0, true);
+
+        // send a single write request
+        TestClientThread client = new TestClientThread(0);
+        client.sendWriteRequest();
+
+        // wait for request being committed
+        delayProcessor.waitRequestProcessing();
+
+        zks.shutdown();
+
+        // Make sure we've finished the in-flight request before shutdown returns
+        assertFalse(commitProcessor.isAlive());
+    }
+
     @Test
     public void testNoCommitWorkersMixedWorkload() throws Exception {
         int numClients = 10;
@@ -287,8 +311,15 @@ public class CommitProcessorTest extends ZKTestCase {
 
     private class TestZooKeeperServer extends ZooKeeperServer {
 
+        final boolean withDelayProcessor;
+
         public TestZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException {
+            this(snapDir, logDir, tickTime, false);
+        }
+
+        public TestZooKeeperServer(File snapDir, File logDir, int tickTime, boolean withDelayProcessor) throws IOException {
             super(snapDir, logDir, tickTime);
+            this.withDelayProcessor = withDelayProcessor;
         }
 
         public PrepRequestProcessor getFirstProcessor() {
@@ -303,7 +334,12 @@ public class CommitProcessorTest extends ZKTestCase {
             // ValidateProcessor is set up in a similar fashion to ToBeApplied
             // processor, so it can do pre/post validating of requests
             ValidateProcessor validateProcessor = new ValidateProcessor(finalProcessor);
-            commitProcessor = new CommitProcessor(validateProcessor, "1", true, null);
+            if (withDelayProcessor) {
+                delayProcessor = new DelayRequestProcessor(validateProcessor);
+                commitProcessor = new CommitProcessor(delayProcessor, "1", true, null);
+            } else {
+                commitProcessor = new CommitProcessor(validateProcessor, "1", true, null);
+            }
             validateProcessor.setCommitProcessor(commitProcessor);
             commitProcessor.start();
             MockProposalRequestProcessor proposalProcessor = new MockProposalRequestProcessor(commitProcessor);
@@ -314,6 +350,46 @@ public class CommitProcessorTest extends ZKTestCase {
 
     }
 
+    private class DelayRequestProcessor implements RequestProcessor {
+        // delay 1s for each request
+        static final int DEFAULT_DELAY = 1000;
+        RequestProcessor nextProcessor;
+        CountDownLatch waitingProcessRequestBeingCalled;
+
+        public DelayRequestProcessor(RequestProcessor nextProcessor) {
+            this.nextProcessor = nextProcessor;
+            this.waitingProcessRequestBeingCalled = new CountDownLatch(1);
+        }
+
+        @Override
+        public void processRequest(Request request) throws RequestProcessorException {
+            try {
+                this.waitingProcessRequestBeingCalled.countDown();
+                LOG.info("Sleeping {} ms for request {}", DEFAULT_DELAY, request);
+                Thread.sleep(DEFAULT_DELAY);
+            } catch (InterruptedException e) { /* ignore */ }
+            nextProcessor.processRequest(request);
+        }
+
+        public void waitRequestProcessing() {
+            try {
+                if (!waitingProcessRequestBeingCalled.await(3000, TimeUnit.MILLISECONDS)) {
+                    LOG.info("Did not see request processing in 3s");
+                }
+            } catch (InterruptedException e) {
+                LOG.info("Interrupted when waiting for processRequest being called");
+            }
+        }
+
+        @Override
+        public void shutdown() {
+            LOG.info("shutdown DelayRequestProcessor");
+            if (nextProcessor != null) {
+                nextProcessor.shutdown();
+            }
+        }
+    }
+
     private class MockProposalRequestProcessor extends Thread implements RequestProcessor {
 
         private final CommitProcessor commitProcessor;