Kaynağa Gözat

HDFS-9939. Increase DecompressorStream skip buffer size. Contributed by John Zhuge.

Andrew Wang 9 yıl önce
ebeveyn
işleme
8ef75a702e

+ 30 - 11
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java

@@ -22,22 +22,36 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.compress.Decompressor;
 
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class DecompressorStream extends CompressionInputStream {
+  /**
+   * The maximum input buffer size.
+   */
+  private static final int MAX_INPUT_BUFFER_SIZE = 512;
+  /**
+   * MAX_SKIP_BUFFER_SIZE is used to determine the maximum buffer size to
+   * use when skipping. See {@link java.io.InputStream}.
+   */
+  private static final int MAX_SKIP_BUFFER_SIZE = 2048;
+
+  private byte[] skipBytes;
+  private byte[] oneByte = new byte[1];
+
   protected Decompressor decompressor = null;
   protected byte[] buffer;
   protected boolean eof = false;
   protected boolean closed = false;
   private int lastBytesSent = 0;
 
-  public DecompressorStream(InputStream in, Decompressor decompressor,
-                            int bufferSize)
-  throws IOException {
+  @VisibleForTesting
+  DecompressorStream(InputStream in, Decompressor decompressor,
+                            int bufferSize, int skipBufferSize)
+      throws IOException {
     super(in);
 
     if (decompressor == null) {
@@ -48,11 +62,18 @@ public class DecompressorStream extends CompressionInputStream {
 
     this.decompressor = decompressor;
     buffer = new byte[bufferSize];
+    skipBytes = new byte[skipBufferSize];
+  }
+
+  public DecompressorStream(InputStream in, Decompressor decompressor,
+                            int bufferSize)
+      throws IOException {
+    this(in, decompressor, bufferSize, MAX_SKIP_BUFFER_SIZE);
   }
 
   public DecompressorStream(InputStream in, Decompressor decompressor)
-  throws IOException {
-    this(in, decompressor, 512);
+      throws IOException {
+    this(in, decompressor, MAX_INPUT_BUFFER_SIZE);
   }
 
   /**
@@ -64,8 +85,7 @@ public class DecompressorStream extends CompressionInputStream {
   protected DecompressorStream(InputStream in) throws IOException {
     super(in);
   }
-  
-  private byte[] oneByte = new byte[1];
+
   @Override
   public int read() throws IOException {
     checkStream();
@@ -86,7 +106,7 @@ public class DecompressorStream extends CompressionInputStream {
   }
 
   protected int decompress(byte[] b, int off, int len) throws IOException {
-    int n = 0;
+    int n;
 
     while ((n = decompressor.decompress(b, off, len)) == 0) {
       if (decompressor.needsDictionary()) {
@@ -170,7 +190,6 @@ public class DecompressorStream extends CompressionInputStream {
     decompressor.reset();
   }
 
-  private byte[] skipBytes = new byte[512];
   @Override
   public long skip(long n) throws IOException {
     // Sanity checks
@@ -178,7 +197,7 @@ public class DecompressorStream extends CompressionInputStream {
       throw new IllegalArgumentException("negative skip length");
     }
     checkStream();
-    
+
     // Read 'n' bytes
     int skipped = 0;
     while (skipped < n) {

+ 113 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/FakeCompressor.java

@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A fake compressor
+ * Its input and output is the same.
+ */
+class FakeCompressor implements Compressor {
+
+  private boolean finish;
+  private boolean finished;
+  private int nread;
+  private int nwrite;
+
+  private byte[] userBuf;
+  private int userBufOff;
+  private int userBufLen;
+
+  @Override
+  public int compress(byte[] b, int off, int len) throws IOException {
+    int n = Math.min(len, userBufLen);
+    if (userBuf != null && b != null)
+      System.arraycopy(userBuf, userBufOff, b, off, n);
+    userBufOff += n;
+    userBufLen -= n;
+    nwrite += n;
+
+    if (finish && userBufLen <= 0)
+      finished = true;
+
+    return n;
+  }
+
+  @Override
+  public void end() {
+    // nop
+  }
+
+  @Override
+  public void finish() {
+    finish = true;
+  }
+
+  @Override
+  public boolean finished() {
+    return finished;
+  }
+
+  @Override
+  public long getBytesRead() {
+    return nread;
+  }
+
+  @Override
+  public long getBytesWritten() {
+    return nwrite;
+  }
+
+  @Override
+  public boolean needsInput() {
+    return userBufLen <= 0;
+  }
+
+  @Override
+  public void reset() {
+    finish = false;
+    finished = false;
+    nread = 0;
+    nwrite = 0;
+    userBuf = null;
+    userBufOff = 0;
+    userBufLen = 0;
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    // nop
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    nread += len;
+    userBuf = b;
+    userBufOff = off;
+    userBufLen = len;
+  }
+
+  @Override
+  public void reinit(Configuration conf) {
+    // nop
+  }
+
+}

+ 109 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/FakeDecompressor.java

@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+
+/**
+ * A fake decompressor, just like FakeCompressor
+ * Its input and output is the same.
+ */
+class FakeDecompressor implements Decompressor {
+
+  private boolean finish;
+  private boolean finished;
+  private int nread;
+  private int nwrite;
+
+  private byte[] userBuf;
+  private int userBufOff;
+  private int userBufLen;
+
+  @Override
+  public int decompress(byte[] b, int off, int len) throws IOException {
+    int n = Math.min(len, userBufLen);
+    if (userBuf != null && b != null)
+      System.arraycopy(userBuf, userBufOff, b, off, n);
+    userBufOff += n;
+    userBufLen -= n;
+    nwrite += n;
+
+    if (finish && userBufLen <= 0)
+      finished = true;
+
+    return n;
+  }
+
+  @Override
+  public void end() {
+    // nop
+  }
+
+  @Override
+  public boolean finished() {
+    return finished;
+  }
+
+  public long getBytesRead() {
+    return nread;
+  }
+
+  public long getBytesWritten() {
+    return nwrite;
+  }
+
+  @Override
+  public boolean needsDictionary() {
+    return false;
+  }
+
+  @Override
+  public boolean needsInput() {
+    return userBufLen <= 0;
+  }
+
+  @Override
+  public void reset() {
+    finish = false;
+    finished = false;
+    nread = 0;
+    nwrite = 0;
+    userBuf = null;
+    userBufOff = 0;
+    userBufLen = 0;
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    // nop
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    nread += len;
+    userBuf = b;
+    userBufOff = off;
+    userBufLen = len;
+  }
+
+  @Override
+  public int getRemaining() {
+    return 0;
+  }
+
+}

+ 2 - 178
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestBlockDecompressorStream.java

@@ -25,7 +25,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
 public class TestBlockDecompressorStream {
@@ -67,187 +66,12 @@ public class TestBlockDecompressorStream {
     bytesIn = new ByteArrayInputStream(buf);
     
     // get decompression stream
-    BlockDecompressorStream blockDecompressorStream = 
-      new BlockDecompressorStream(bytesIn, new FakeDecompressor(), 1024);
-    try {
+    try (BlockDecompressorStream blockDecompressorStream =
+      new BlockDecompressorStream(bytesIn, new FakeDecompressor(), 1024)) {
       assertEquals("return value is not -1", 
           -1 , blockDecompressorStream.read());
     } catch (IOException e) {
       fail("unexpected IOException : " + e);
-    } finally {
-      blockDecompressorStream.close();
     }
   }
-}
-
-/**
- * A fake compressor
- * Its input and output is the same.
- */
-class FakeCompressor implements Compressor{
-
-  private boolean finish;
-  private boolean finished;
-  int nread;
-  int nwrite;
-  
-  byte [] userBuf;
-  int userBufOff;
-  int userBufLen;
-  
-  @Override
-  public int compress(byte[] b, int off, int len) throws IOException {
-    int n = Math.min(len, userBufLen);
-    if (userBuf != null && b != null)
-      System.arraycopy(userBuf, userBufOff, b, off, n);
-    userBufOff += n;
-    userBufLen -= n;
-    nwrite += n;
-    
-    if (finish && userBufLen <= 0)
-      finished = true;   
-        
-    return n;
-  }
-
-  @Override
-  public void end() {
-    // nop
-  }
-
-  @Override
-  public void finish() {
-    finish = true;
-  }
-
-  @Override
-  public boolean finished() {
-    return finished;
-  }
-
-  @Override
-  public long getBytesRead() {
-    return nread;
-  }
-
-  @Override
-  public long getBytesWritten() {
-    return nwrite;
-  }
-
-  @Override
-  public boolean needsInput() {
-    return userBufLen <= 0;
-  }
-
-  @Override
-  public void reset() {
-    finish = false;
-    finished = false;
-    nread = 0;
-    nwrite = 0;
-    userBuf = null;
-    userBufOff = 0;
-    userBufLen = 0;
-  }
-
-  @Override
-  public void setDictionary(byte[] b, int off, int len) {
-    // nop
-  }
-
-  @Override
-  public void setInput(byte[] b, int off, int len) {
-    nread += len;
-    userBuf = b;
-    userBufOff = off;
-    userBufLen = len;
-  }
-
-  @Override
-  public void reinit(Configuration conf) {
-    // nop
-  }
-  
-}
-
-/**
- * A fake decompressor, just like FakeCompressor
- * Its input and output is the same.
- */
-class FakeDecompressor implements Decompressor {
-  
-  private boolean finish;
-  private boolean finished;
-  int nread;
-  int nwrite;
-  
-  byte [] userBuf;
-  int userBufOff;
-  int userBufLen;
-
-  @Override
-  public int decompress(byte[] b, int off, int len) throws IOException {
-    int n = Math.min(len, userBufLen);
-    if (userBuf != null && b != null)
-      System.arraycopy(userBuf, userBufOff, b, off, n);
-    userBufOff += n;
-    userBufLen -= n;
-    nwrite += n;
-    
-    if (finish && userBufLen <= 0)
-      finished = true;
-    
-    return n;
-  }
-
-  @Override
-  public void end() {
-    // nop
-  }
-
-  @Override
-  public boolean finished() {
-    return finished;
-  }
-
-  @Override
-  public boolean needsDictionary() {
-    return false;
-  }
-
-  @Override
-  public boolean needsInput() {
-    return userBufLen <= 0;
-  }
-
-  @Override
-  public void reset() {
-    finish = false;
-    finished = false;
-    nread = 0;
-    nwrite = 0;
-    userBuf = null;
-    userBufOff = 0;
-    userBufLen = 0;
-  }
-
-  @Override
-  public void setDictionary(byte[] b, int off, int len) {
-    // nop
-  }
-
-  @Override
-  public void setInput(byte[] b, int off, int len) {
-    nread += len;
-    userBuf = b;
-    userBufOff = off;
-    userBufLen = len;
-  }
-
-  @Override
-  public int getRemaining() {
-    return 0;
-  }
-  
 }

+ 99 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestDecompressorStream.java

@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDecompressorStream {
+  private static final String TEST_STRING =
+      "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+
+  private ByteArrayInputStream bytesIn;
+  private Decompressor decompressor;
+  private DecompressorStream decompressorStream;
+
+  @Before
+  public void setUp() throws IOException {
+    bytesIn = new ByteArrayInputStream(TEST_STRING.getBytes());
+    decompressor = new FakeDecompressor();
+    decompressorStream =
+        new DecompressorStream(bytesIn, decompressor, 20, 13);
+  }
+
+  @Test
+  public void testReadOneByte() throws IOException {
+    for (int i = 0; i < TEST_STRING.length(); ++i) {
+      assertThat(decompressorStream.read(), is((int) TEST_STRING.charAt(i)));
+    }
+    try {
+      int ret = decompressorStream.read();
+      fail("Not reachable but got ret " + ret);
+    } catch (EOFException e) {
+      // Expect EOF exception
+    }
+  }
+
+  @Test
+  public void testReadBuffer() throws IOException {
+    // 32 buf.length < 52 TEST_STRING.length()
+    byte[] buf = new byte[32];
+    int bytesToRead = TEST_STRING.length();
+    int i = 0;
+    while (bytesToRead > 0) {
+      int n = Math.min(bytesToRead, buf.length);
+      int bytesRead = decompressorStream.read(buf, 0, n);
+      assertTrue(bytesRead > 0 && bytesRead <= n);
+      assertThat(new String(buf, 0, bytesRead),
+          is(TEST_STRING.substring(i, i + bytesRead)));
+      bytesToRead = bytesToRead - bytesRead;
+      i = i + bytesRead;
+    }
+    try {
+      int ret = decompressorStream.read(buf, 0, buf.length);
+      fail("Not reachable but got ret " + ret);
+    } catch (EOFException e) {
+      // Expect EOF exception
+    }
+  }
+
+  @Test
+  public void testSkip() throws IOException {
+    assertThat(decompressorStream.skip(12), is(12L));
+    assertThat(decompressorStream.read(), is((int)TEST_STRING.charAt(12)));
+    assertThat(decompressorStream.read(), is((int)TEST_STRING.charAt(13)));
+    assertThat(decompressorStream.read(), is((int)TEST_STRING.charAt(14)));
+    assertThat(decompressorStream.skip(10), is(10L));
+    assertThat(decompressorStream.read(), is((int)TEST_STRING.charAt(25)));
+    try {
+      long ret = decompressorStream.skip(1000);
+      fail("Not reachable but got ret " + ret);
+    } catch (EOFException e) {
+      // Expect EOF exception
+    }
+  }
+}