Bläddra i källkod

ZOOKEEPER-1928. add configurable throttling to the number of snapshots concurrently sent by a leader (Edward Carter via fpj)


git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1601516 13f79535-47bb-0310-9956-ffa450edef68
Flavio Paiva Junqueira 11 år sedan
förälder
incheckning
bce5c62c62

+ 7 - 1
CHANGES.txt

@@ -29,6 +29,9 @@ NEW FEATURES:
   ZOOKEEPER-1887. C implementation of removeWatches (Raul Gutierrez Segales via
   michim)
 
+  ZOOKEEPER-1928. add configurable throttling to the number of snapshots
+  concurrently sent by a leader (Edward Carter via fpj)
+
 BUGFIXES:
 
   ZOOKEEPER-786. Exception in ZooKeeper.toString
@@ -729,7 +732,7 @@ IMPROVEMENTS:
   ZOOKEEPER-756. some cleanup and improvements for zooinspector
   (Colin Goodheart-Smithe & Thomas Koch via phunt)
 
-  ZOOKEEPER-1292. FLETest is flaky (flp via breed)
+  ZOOKEEPER-1292. FLETest is flaky (fpj via breed)
 
   ZOOKEEPER-1326. The CLI commands "delete" and "rmr" are confusing.
   Can we have "rm" + "rmr" instead? (Harsh J via phunt)
@@ -923,6 +926,9 @@ IMPROVEMENTS:
 
   ZOOKEEPER-1659. Add JMX support for dynamic reconfiguration (Rakesh R via
   michim)
+  
+  ZOOKEEPER-1928. add configurable throttling to the number of snapshots
+  concurrently sent by a leader (Edward Carter via fpj)
 
 headers
 

+ 19 - 0
src/java/main/org/apache/zookeeper/server/quorum/Leader.java

@@ -72,6 +72,21 @@ public class Leader {
         }
     }
 
+    // Throttle when there are too many concurrent snapshots being sent to observers
+    private static final String MAX_CONCURRENT_SNAPSHOTS = "zookeeper.leader.maxConcurrentSnapshots";
+    private static final int maxConcurrentSnapshots;
+    private static final String MAX_CONCURRENT_SNAPSHOT_TIMEOUT = "zookeeper.leader.maxConcurrentSnapshotTimeout";
+    private static final long maxConcurrentSnapshotTimeout;
+    static {
+        maxConcurrentSnapshots = Integer.getInteger(MAX_CONCURRENT_SNAPSHOTS, 10);
+        LOG.info(MAX_CONCURRENT_SNAPSHOTS + " = " + maxConcurrentSnapshots);
+        maxConcurrentSnapshotTimeout = Long.getLong(MAX_CONCURRENT_SNAPSHOT_TIMEOUT, 5);
+        LOG.info(MAX_CONCURRENT_SNAPSHOT_TIMEOUT + " = " + maxConcurrentSnapshotTimeout);
+    }
+
+    private final LearnerSnapshotThrottler learnerSnapshotThrottler = 
+        new LearnerSnapshotThrottler(maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
+
     final LeaderZooKeeperServer zk;
 
     final QuorumPeer self;
@@ -1051,6 +1066,10 @@ public class Leader {
         }
         return p;
     }
+    
+    public LearnerSnapshotThrottler getLearnerSnapshotThrottler() {
+        return learnerSnapshotThrottler;
+    }
 
     /**
      * Process sync requests

+ 24 - 13
src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java

@@ -423,20 +423,29 @@ public class LearnerHandler extends ZooKeeperThread {
 
             /* if we are not truncating or sending a diff just send a snapshot */
             if (needSnap) {
-                long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
-                oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
-                bufferedOutput.flush();
+                boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
+                LearnerSnapshot snapshot = 
+                        leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
+                try {
+                    long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
+                    oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
+                    bufferedOutput.flush();
 
-                LOG.info("Sending snapshot last zxid of peer is 0x"
-                        + Long.toHexString(peerLastZxid) + " "
-                        + "zxid of leader is 0x"
-                        + Long.toHexString(leaderLastZxid) + " "
-                        + "sent zxid of db as 0x"
-                        + Long.toHexString(zxidToSend));
-                // Dump data to peer
-                leader.zk.getZKDatabase().serializeSnapshot(oa);
-                oa.writeString("BenWasHere", "signature");
-                bufferedOutput.flush();
+                    LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
+                            + "send zxid of db as 0x{}, {} concurrent snapshots, " 
+                            + "snapshot was {} from throttle",
+                            Long.toHexString(peerLastZxid), 
+                            Long.toHexString(leaderLastZxid),
+                            Long.toHexString(zxidToSend), 
+                            snapshot.getConcurrentSnapshotNumber(),
+                            snapshot.isEssential() ? "exempt" : "not exempt");
+                    // Dump data to peer
+                    leader.zk.getZKDatabase().serializeSnapshot(oa);
+                    oa.writeString("BenWasHere", "signature");
+                    bufferedOutput.flush();
+                } finally {
+                    snapshot.close();
+                }
             }
 
             // Start thread that blast packets in the queue to learner
@@ -580,6 +589,8 @@ public class LearnerHandler extends ZooKeeperThread {
             }
         } catch (InterruptedException e) {
             LOG.error("Unexpected exception causing shutdown", e);
+        } catch (SnapshotThrottleException e) {
+            LOG.error("too many concurrent snapshots: " + e);
         } finally {
             LOG.warn("******* GOODBYE "
                     + (sock != null ? sock.getRemoteSocketAddress() : "<null>")

+ 44 - 0
src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshot.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+public class LearnerSnapshot {
+    private final LearnerSnapshotThrottler throttler;
+    private final int concurrentSnapshotNumber;
+    private final boolean essential;
+
+    LearnerSnapshot(LearnerSnapshotThrottler throttler, 
+            int concurrentSnapshotNumber, boolean essential) {
+        this.throttler = throttler;
+        this.concurrentSnapshotNumber = concurrentSnapshotNumber;
+        this.essential = essential;
+    }
+
+    public void close() {
+        throttler.endSnapshot();
+    }
+
+    public int getConcurrentSnapshotNumber() {
+        return concurrentSnapshotNumber;
+    }
+    
+    public boolean isEssential() {
+        return essential;
+    }
+}

+ 137 - 0
src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottler.java

@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to limit the number of concurrent snapshots from a leader to
+ * observers and followers.  {@link LearnerHandler} objects should call
+ * {@link #beginSnapshot(boolean)} before sending a snapshot and
+ * {@link #endSnapshot()} after finishing, successfully or not.
+ *
+ */
+public class LearnerSnapshotThrottler {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(LearnerSnapshotThrottler.class);
+
+    private final Object snapCountSyncObject = new Object();
+    private int snapsInProgress;
+
+    private final int maxConcurrentSnapshots;
+    private final long timeoutMillis;
+
+    /**
+     * Constructs a new instance limiting the concurrent number of snapshots to
+     * <code>maxConcurrentSnapshots</code>.
+     * @param maxConcurrentSnapshots maximum concurrent number of snapshots
+     * @param timeoutMillis milliseconds to attempt to wait when attempting to
+     *                      begin a snapshot that would otherwise be throttled;
+     *                      a value of zero means no waiting will be attempted
+     * @throws java.lang.IllegalArgumentException when <code>timeoutMillis</code>
+     *                                            is negative or
+     *                                            <code>maxConcurrentSnaphots</code>
+     *                                            is less than 1
+     */
+    public LearnerSnapshotThrottler(int maxConcurrentSnapshots,
+                                    long timeoutMillis) {
+        if (timeoutMillis < 0) {
+            String errorMsg = "timeout cannot be negative, was " + timeoutMillis;
+            throw new IllegalArgumentException(errorMsg);
+        }
+        if (maxConcurrentSnapshots <= 0) {
+            String errorMsg = "maxConcurrentSnapshots must be positive, was " +
+                    maxConcurrentSnapshots;
+            throw new IllegalArgumentException(errorMsg);
+        }
+
+        this.maxConcurrentSnapshots = maxConcurrentSnapshots;
+        this.timeoutMillis = timeoutMillis;
+
+        synchronized (snapCountSyncObject) {
+            snapsInProgress = 0;
+        }
+    }
+
+    public LearnerSnapshotThrottler(int maxConcurrentSnapshots) {
+        this(maxConcurrentSnapshots, 0);
+    }
+
+    /**
+     * Indicates that a new snapshot is about to be sent.
+     * 
+     * @param essential if <code>true</code>, do not throw an exception even
+     *                  if throttling limit is reached
+     * @throws SnapshotThrottleException if throttling limit has been exceeded
+     *                                   and <code>essential == false</code>,
+     *                                   even after waiting for the timeout
+     *                                   period, if any
+     * @throws InterruptedException if thread is interrupted while trying
+     *                              to start a snapshot; cannot happen if
+     *                              timeout is zero
+     */
+    public LearnerSnapshot beginSnapshot(boolean essential)
+            throws SnapshotThrottleException, InterruptedException {
+        int snapshotNumber;
+
+        synchronized (snapCountSyncObject) {
+            if (!essential
+                && timeoutMillis > 0
+                && snapsInProgress >= maxConcurrentSnapshots) {
+                long timestamp = System.currentTimeMillis();
+                do {
+                    snapCountSyncObject.wait(timeoutMillis);
+                } while (snapsInProgress >= maxConcurrentSnapshots
+                         && timestamp + timeoutMillis < System.currentTimeMillis());
+            }
+
+            if (essential || snapsInProgress < maxConcurrentSnapshots) {
+                snapsInProgress++;
+                snapshotNumber = snapsInProgress;
+            } else {
+                throw new SnapshotThrottleException(snapsInProgress + 1,
+                                                    maxConcurrentSnapshots);
+            }
+        }
+
+        return new LearnerSnapshot(this, snapshotNumber, essential);
+    }
+
+    /**
+     * Indicates that a snapshot has been completed.
+     */
+    public void endSnapshot() {
+        int newCount;
+        synchronized (snapCountSyncObject) {
+            snapsInProgress--;
+            newCount = snapsInProgress;
+            snapCountSyncObject.notify();
+        }
+
+        if (newCount < 0) {
+            String errorMsg =
+                    "endSnapshot() called incorrectly; current snapshot count is "
+                            + newCount;
+            LOG.error(errorMsg);
+        }
+    }
+}

+ 39 - 0
src/java/main/org/apache/zookeeper/server/quorum/SnapshotThrottleException.java

@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+/**
+ * Thrown when a {@link Leader} has too many concurrent snapshots being sent
+ * to observers.
+ * 
+ * @see LearnerSnapshotThrottler
+ *
+ */
+public class SnapshotThrottleException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public SnapshotThrottleException(int concurrentSnapshotNumber, int throttleThreshold) {
+        super(getMessage(concurrentSnapshotNumber, throttleThreshold));
+    }
+
+    private static String getMessage(int concurrentSnapshotNumber, int throttleThreshold) {
+        return String.format("new snapshot would make %d concurrently in progress; " +
+                "maximum is %d", concurrentSnapshotNumber, throttleThreshold);
+    }
+}

+ 214 - 0
src/java/test/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java

@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class LearnerSnapshotThrottlerTest extends ZKTestCase {
+    @Test(expected = SnapshotThrottleException.class)
+    public void testTooManySnapshotsNonessential() throws Exception {
+        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
+        for (int i = 0; i < 6; i++) {
+            throttler.beginSnapshot(false);
+        }
+    }
+
+    @Test(expected = SnapshotThrottleException.class)
+    public void testTooManySnapshotsEssential() throws Exception {
+        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
+        try {
+            for (int i = 0; i < 6; i++) {
+                throttler.beginSnapshot(true);
+            }
+        }
+        catch (SnapshotThrottleException ex) {
+            Assert.fail("essential snapshots should not be throttled");
+        }
+        throttler.endSnapshot();
+        throttler.beginSnapshot(false);
+    }
+
+    @Test
+    public void testNoThrottle() throws Exception {
+        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
+        try {
+            for (int i = 0; i < 6; i++) {
+                throttler.beginSnapshot(true);
+            }
+        }
+        catch (SnapshotThrottleException ex) {
+            Assert.fail("essential snapshots should not be throttled");
+        }
+        throttler.endSnapshot();
+        for (int i = 0; i < 5; i++) {
+            throttler.endSnapshot();
+            throttler.beginSnapshot(false);
+        }
+    }
+
+    @Test
+    public void testTryWithResourceNoThrottle() throws Exception {
+        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(1);
+        for (int i = 0; i < 3; i++) {
+            LearnerSnapshot snapshot = throttler.beginSnapshot(false);
+            try {
+                Assert.assertFalse(snapshot.isEssential());
+                Assert.assertEquals(1, snapshot.getConcurrentSnapshotNumber());
+            } finally {
+                snapshot.close();
+            }
+        }
+    }
+
+    @Test(expected = SnapshotThrottleException.class)
+    public void testTryWithResourceThrottle() throws Exception {
+        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(1);
+        LearnerSnapshot outer = throttler.beginSnapshot(true);
+        try {
+            LearnerSnapshot inner = throttler.beginSnapshot(false);
+            try {
+                Assert.fail("shouldn't be able to have both snapshots open");
+            } finally {
+                inner.close();
+            }
+        } finally {
+            outer.close();
+        }
+    }
+
+    @Test
+    public void testParallelNoThrottle() throws Exception {
+        final int numThreads = 50;
+
+        final LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(numThreads);
+        ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+        final CountDownLatch threadStartLatch = new CountDownLatch(numThreads);
+        final CountDownLatch snapshotProgressLatch = new CountDownLatch(numThreads);
+
+        List<Future<Boolean>> results = new ArrayList<Future<Boolean>>(numThreads);
+        for (int i = 0; i < numThreads; i++) {
+            results.add(threadPool.submit(new Callable<Boolean>() {
+
+                @Override
+                public Boolean call() {
+                    threadStartLatch.countDown();
+                    try {
+                        threadStartLatch.await();
+
+                        throttler.beginSnapshot(false);
+
+                        snapshotProgressLatch.countDown();
+                        snapshotProgressLatch.await();
+
+                        throttler.endSnapshot();
+                    }
+                    catch (Exception e) {
+                        return false;
+                    }
+
+                    return true;
+                }
+            }));
+        }
+
+        for (Future<Boolean> result : results) {
+            Assert.assertTrue(result.get());
+        }
+    }
+
+    @Test
+    public void testPositiveTimeout() throws Exception {
+        final LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(1, 200);
+        ExecutorService threadPool = Executors.newFixedThreadPool(1);
+
+        LearnerSnapshot first = throttler.beginSnapshot(false);
+        final CountDownLatch snapshotProgressLatch = new CountDownLatch(1);
+
+        Future<Boolean> result = threadPool.submit(new Callable<Boolean>() {
+            @Override
+            public Boolean call() {
+                try {
+                    snapshotProgressLatch.countDown();
+                    LearnerSnapshot second = throttler.beginSnapshot(false);
+                    second.close();
+                }
+                catch (Exception e) {
+                    return false;
+                }
+
+                return true;
+            }
+        });
+
+        snapshotProgressLatch.await();
+
+        first.close();
+
+        Assert.assertTrue(result.get());
+    }
+
+    @Test
+    public void testHighContentionWithTimeout() throws Exception {
+        int numThreads = 20;
+
+        final LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(2, 200);
+        ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+        final CountDownLatch threadStartLatch = new CountDownLatch(numThreads);
+        final CountDownLatch snapshotProgressLatch = new CountDownLatch(numThreads);
+
+        List<Future<Boolean>> results = new ArrayList<Future<Boolean>>(numThreads);
+        for (int i = 0; i < numThreads; i++) {
+            results.add(threadPool.submit(new Callable<Boolean>() {
+
+                @Override
+                public Boolean call() {
+                    threadStartLatch.countDown();
+                    try {
+                        threadStartLatch.await();
+
+                        LearnerSnapshot snap = throttler.beginSnapshot(false);
+
+                        int snapshotNumber = snap.getConcurrentSnapshotNumber();
+
+                        throttler.endSnapshot();
+
+                        return snapshotNumber <= 2;
+                    }
+                    catch (Exception e) {
+                        return false;
+                    }
+                }
+            }));
+        }
+
+        for (Future<Boolean> result : results) {
+            Assert.assertTrue(result.get());
+        }
+    }
+}