Преглед на файлове

ZOOKEEPER-3496: Transaction larger than jute.maxbuffer makes ZooKeeper service unavailable

Author: Mohammad Arshad <arshad.mohammad.k@gmail.com>
Author: Mohammad Arshad <arshad@apache.org>

Reviewers: Enrico Olivelli <eolivelli@apache.org>, Michael Han <hanm@apache.org>

Closes #1096 from arshadmohammad/ZOOKEEPER-3496-master
Mohammad Arshad преди 5 години
родител
ревизия
4279758ead

+ 11 - 0
zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md

@@ -1387,6 +1387,17 @@ the variable does.
     problems will arise. This is really a sanity check. ZooKeeper is
     designed to store data on the order of kilobytes in size.
 
+* *jute.maxbuffer.extrasize*:
+    (Java system property: **zookeeper.jute.maxbuffer.extrasize**)
+    **New in 3.5.7:**
+    While processing client requests ZooKeeper server adds some additional information into 
+    the requests before persisting it as a transaction. Earlier this additional information size 
+    was fixed to 1024 bytes. For many scenarios, specially scenarios where jute.maxbuffer value
+    is more than 1 MB and request type is multi, this fixed size was insufficient.
+    To handle all the scenarios additional information size is increased from 1024 byte 
+    to same as jute.maxbuffer size and also it is made configurable through jute.maxbuffer.extrasize.
+    Generally this property is not required to be configured as default value is the most optimal value.
+
 * *skipACL* :
     (Java system property: **zookeeper.skipACL**)
     Skips ACL checks. This results in a boost in throughput,

+ 21 - 1
zookeeper-jute/src/main/java/org/apache/jute/BinaryInputArchive.java

@@ -34,8 +34,22 @@ public class BinaryInputArchive implements InputArchive {
     // CHECKSTYLE.OFF: ConstantName - for backward compatibility
     public static final int maxBuffer = Integer.getInteger("jute.maxbuffer", 0xfffff);
     // CHECKSTYLE.ON:
+    private static final int extraMaxBuffer;
+
+    static {
+        final Integer configuredExtraMaxBuffer =
+            Integer.getInteger("zookeeper.jute.maxbuffer.extrasize", maxBuffer);
+        if (configuredExtraMaxBuffer < 1024) {
+            // Earlier hard coded value was 1024, So the value should not be less than that value
+            extraMaxBuffer = 1024;
+        } else {
+            extraMaxBuffer = configuredExtraMaxBuffer;
+        }
+    }
 
     private DataInput in;
+    private int maxBufferSize;
+    private int extraMaxBufferSize;
 
     public static BinaryInputArchive getArchive(InputStream strm) {
         return new BinaryInputArchive(new DataInputStream(strm));
@@ -61,7 +75,13 @@ public class BinaryInputArchive implements InputArchive {
      * Creates a new instance of BinaryInputArchive.
      */
     public BinaryInputArchive(DataInput in) {
+        this(in, maxBuffer, extraMaxBuffer);
+    }
+
+    public BinaryInputArchive(DataInput in, int maxBufferSize, int extraMaxBufferSize) {
         this.in = in;
+        this.maxBufferSize = maxBufferSize;
+        this.extraMaxBufferSize = extraMaxBufferSize;
     }
 
     public byte readByte(String tag) throws IOException {
@@ -142,7 +162,7 @@ public class BinaryInputArchive implements InputArchive {
     // make up for extra fields, etc. (otherwise e.g. clients may be able to
     // write buffers larger than we can read from disk!)
     private void checkLength(int len) throws IOException {
-        if (len < 0 || len > maxBuffer + 1024) {
+        if (len < 0 || len > maxBufferSize + extraMaxBufferSize) {
             throw new IOException(UNREASONBLE_LENGTH + len);
         }
     }

+ 57 - 0
zookeeper-jute/src/test/java/org/apache/jute/BinaryInputArchiveTest.java

@@ -20,10 +20,13 @@ package org.apache.jute;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import org.junit.Test;
 
@@ -137,5 +140,59 @@ public class BinaryInputArchiveTest {
                 }
         );
     }
+  /**
+   * Record length is more than the maxbuffer + extrasize length.
+   */
+  @Test
+  public void testReadStringForRecordsHavingLengthMoreThanMaxAllowedSize() {
+    int maxBufferSize = 2000;
+    int extraMaxBufferSize = 1025;
+    //this record size is more than the max allowed size
+    int recordSize = maxBufferSize + extraMaxBufferSize + 100;
+    BinaryInputArchive ia =
+        getBinaryInputArchive(recordSize, maxBufferSize, extraMaxBufferSize);
+    try {
+      ia.readString("");
+      fail("Should have thrown an IOException");
+    } catch (IOException e) {
+      assertTrue("Not 'Unreasonable length' exception: " + e,
+          e.getMessage().startsWith(BinaryInputArchive.UNREASONBLE_LENGTH));
+    }
+  }
+
+  /**
+   * Record length is less than then maxbuffer + extrasize length.
+   */
+  @Test
+  public void testReadStringForRecordsHavingLengthLessThanMaxAllowedSize()
+      throws IOException {
+    int maxBufferSize = 2000;
+    int extraMaxBufferSize = 1025;
+    int recordSize = maxBufferSize + extraMaxBufferSize - 100;
+    //Exception is not expected as record size is less than the allowed size
+    BinaryInputArchive ia =
+        getBinaryInputArchive(recordSize, maxBufferSize, extraMaxBufferSize);
+    String s = ia.readString("");
+    assertNotNull(s);
+    assertEquals(recordSize, s.getBytes().length);
+  }
+
+  private BinaryInputArchive getBinaryInputArchive(int recordSize, int maxBufferSize,
+      int extraMaxBufferSize) {
+    byte[] data = getData(recordSize);
+    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(data));
+    return new BinaryInputArchive(dis, maxBufferSize, extraMaxBufferSize);
+  }
+
+  private byte[] getData(int recordSize) {
+    ByteBuffer buf = ByteBuffer.allocate(recordSize + 4);
+    buf.putInt(recordSize);
+    byte[] bytes = new byte[recordSize];
+    for (int i = 0; i < recordSize; i++) {
+      bytes[i] = (byte) 'a';
+    }
+    buf.put(bytes);
+    return buf.array();
+  }
 
 }