|
@@ -244,6 +244,35 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
|
private RequestThrottler requestThrottler;
|
|
|
public static final String SNAP_COUNT = "zookeeper.snapCount";
|
|
|
|
|
|
+ /**
|
|
|
+ * This setting sets a limit on the total number of large requests that
|
|
|
+ * can be inflight and is designed to prevent ZooKeeper from accepting
|
|
|
+ * too many large requests such that the JVM runs out of usable heap and
|
|
|
+ * ultimately crashes.
|
|
|
+ *
|
|
|
+ * The limit is enforced by the {@link checkRequestSize(int, boolean)}
|
|
|
+ * method which is called by the connection layer ({@link NIOServerCnxn},
|
|
|
+ * {@link NettyServerCnxn}) before allocating a byte buffer and pulling
|
|
|
+ * data off the TCP socket. The limit is then checked again by the
|
|
|
+ * ZooKeeper server in {@link processPacket(ServerCnxn, ByteBuffer)} which
|
|
|
+ * also atomically updates {@link currentLargeRequestBytes}. The request is
|
|
|
+ * then marked as a large request, with the request size stored in the Request
|
|
|
+ * object so that it can later be decremented from {@link currentLargeRequestsBytes}.
|
|
|
+ *
|
|
|
+ * When a request is completed or dropped, the relevant code path calls the
|
|
|
+ * {@link requestFinished(Request)} method which performs the decrement if
|
|
|
+ * needed.
|
|
|
+ */
|
|
|
+ private volatile int largeRequestMaxBytes = 100 * 1024 * 1024;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The size threshold after which a request is considered a large request
|
|
|
+ * and is checked against the large request byte limit.
|
|
|
+ */
|
|
|
+ private volatile int largeRequestThreshold = -1;
|
|
|
+
|
|
|
+ private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0);
|
|
|
+
|
|
|
void removeCnxn(ServerCnxn cnxn) {
|
|
|
zkDb.removeCnxn(cnxn);
|
|
|
}
|
|
@@ -285,6 +314,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
|
|
|
|
this.requestPathMetricsCollector = new RequestPathMetricsCollector();
|
|
|
|
|
|
+ this.initLargeRequestThrottlingSettings();
|
|
|
+
|
|
|
LOG.info("Created server with tickTime " + tickTime
|
|
|
+ " minSessionTimeout " + getMinSessionTimeout()
|
|
|
+ " maxSessionTimeout " + getMaxSessionTimeout()
|
|
@@ -1047,14 +1078,20 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
|
}
|
|
|
} else {
|
|
|
LOG.warn("Received packet at server of unknown type " + si.type);
|
|
|
+ // Update request accounting/throttling limits
|
|
|
+ requestFinished(si);
|
|
|
new UnimplementedRequestProcessor().processRequest(si);
|
|
|
}
|
|
|
} catch (MissingSessionException e) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Dropping request: " + e.getMessage());
|
|
|
}
|
|
|
+ // Update request accounting/throttling limits
|
|
|
+ requestFinished(si);
|
|
|
} catch (RequestProcessorException e) {
|
|
|
LOG.error("Unable to process request:" + e.getMessage(), e);
|
|
|
+ // Update request accounting/throttling limits
|
|
|
+ requestFinished(si);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1380,6 +1417,85 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
|
maxBatchSize = size;
|
|
|
}
|
|
|
|
|
|
+ private void initLargeRequestThrottlingSettings() {
|
|
|
+ setLargeRequestMaxBytes(Integer.getInteger("zookeeper.largeRequestMaxBytes", largeRequestMaxBytes));
|
|
|
+ setLargeRequestThreshold(Integer.getInteger("zookeeper.largeRequestThreshold", -1));
|
|
|
+ }
|
|
|
+
|
|
|
+ public int getLargeRequestMaxBytes() {
|
|
|
+ return largeRequestMaxBytes;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setLargeRequestMaxBytes(int bytes) {
|
|
|
+ if (bytes <= 0) {
|
|
|
+ LOG.warn("Invalid max bytes for all large requests {}. It should be a positive number.", bytes);
|
|
|
+ LOG.warn("Will not change the setting. The max bytes stay at {}", largeRequestMaxBytes);
|
|
|
+ } else {
|
|
|
+ largeRequestMaxBytes = bytes;
|
|
|
+ LOG.info("The max bytes for all large requests are set to {}", largeRequestMaxBytes);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public int getLargeRequestThreshold() {
|
|
|
+ return largeRequestThreshold;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setLargeRequestThreshold(int threshold) {
|
|
|
+ if (threshold == 0 || threshold < -1) {
|
|
|
+ LOG.warn("Invalid large request threshold {}. It should be -1 or positive. Setting to -1 ", threshold);
|
|
|
+ largeRequestThreshold = -1;
|
|
|
+ } else {
|
|
|
+ largeRequestThreshold = threshold;
|
|
|
+ LOG.info("The large request threshold is set to {}", largeRequestThreshold);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public int getLargeRequestBytes() {
|
|
|
+ return currentLargeRequestBytes.get();
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean isLargeRequest(int length) {
|
|
|
+ // The large request limit is disabled when threshold is -1
|
|
|
+ if (largeRequestThreshold == -1) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return length > largeRequestThreshold;
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean checkRequestSizeWhenReceivingMessage(int length) throws IOException {
|
|
|
+ if (!isLargeRequest(length)) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ if (currentLargeRequestBytes.get() + length <= largeRequestMaxBytes) {
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1);
|
|
|
+ throw new IOException("Rejecting large request");
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean checkRequestSizeWhenMessageReceived(int length) throws IOException {
|
|
|
+ if (!isLargeRequest(length)) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ int bytes = currentLargeRequestBytes.addAndGet(length);
|
|
|
+ if (bytes > largeRequestMaxBytes) {
|
|
|
+ currentLargeRequestBytes.addAndGet(-length);
|
|
|
+ ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1);
|
|
|
+ throw new IOException("Rejecting large request");
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void requestFinished(Request request) {
|
|
|
+ int largeRequestLength = request.getLargeRequestSize();
|
|
|
+ if (largeRequestLength != -1) {
|
|
|
+ currentLargeRequestBytes.addAndGet(-largeRequestLength);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
|
|
|
// We have the request, now process and setup for next
|
|
|
InputStream bais = new ByteBufferInputStream(incomingBuffer);
|
|
@@ -1451,6 +1567,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
|
cnxn.disableRecv();
|
|
|
} else {
|
|
|
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
|
|
|
+ int length = incomingBuffer.limit();
|
|
|
+ if (isLargeRequest(length)) {
|
|
|
+ // checkRequestSize will throw IOException if request is rejected
|
|
|
+ checkRequestSizeWhenMessageReceived(length);
|
|
|
+ si.setLargeRequestSize(length);
|
|
|
+ }
|
|
|
si.setOwner(ServerCnxn.me);
|
|
|
// Always treat packet from the client as a possible
|
|
|
// local request.
|