Browse Source

HADOOP-14872. CryptoInputStream should implement unbuffer. Contributed by John Zhuge.

(cherry picked from commit 6c32ddad30240a251caaefdf7fec9ff8ad177a7c)
(cherry picked from commit fc9e156484824fcb59faef2b2914f3cb53901b87)
John Zhuge 7 years ago
parent
commit
5e0f4f212d

+ 29 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java

@@ -30,20 +30,23 @@ import java.util.EnumSet;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
 import org.apache.hadoop.fs.HasFileDescriptor;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.StreamCapabilitiesPolicy;
 import org.apache.hadoop.io.ByteBufferPool;
-
-import com.google.common.base.Preconditions;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is
@@ -61,7 +64,7 @@ import com.google.common.base.Preconditions;
 public class CryptoInputStream extends FilterInputStream implements 
     Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, 
     CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess, 
-    ReadableByteChannel {
+    ReadableByteChannel, CanUnbuffer, StreamCapabilities {
   private final byte[] oneByteBuf = new byte[1];
   private final CryptoCodec codec;
   private final Decryptor decryptor;
@@ -719,4 +722,27 @@ public class CryptoInputStream extends FilterInputStream implements
   public boolean isOpen() {
     return !closed;
   }
+
+  private void cleanDecryptorPool() {
+    decryptorPool.clear();
+  }
+
+  @Override
+  public void unbuffer() {
+    cleanBufferPool();
+    cleanDecryptorPool();
+    StreamCapabilitiesPolicy.unbuffer(in);
+  }
+
+  @Override
+  public boolean hasCapability(String capability) {
+    switch (StringUtils.toLowerCase(capability)) {
+    case StreamCapabilities.READAHEAD:
+    case StreamCapabilities.DROPBEHIND:
+    case StreamCapabilities.UNBUFFER:
+      return true;
+    default:
+      return false;
+    }
+  }
 }

+ 70 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java

@@ -27,6 +27,7 @@ import java.util.EnumSet;
 import java.util.Random;
 
 import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
@@ -102,7 +103,32 @@ public abstract class CryptoStreamsTestBase {
     
     return total;
   }
-  
+
+  private int preadAll(PositionedReadable in, byte[] b, int off, int len)
+      throws IOException {
+    int n = 0;
+    int total = 0;
+    while (n != -1) {
+      total += n;
+      if (total >= len) {
+        break;
+      }
+      n = in.read(total, b, off + total, len - total);
+    }
+
+    return total;
+  }
+
+  private void preadCheck(PositionedReadable in) throws Exception {
+    byte[] result = new byte[dataLen];
+    int n = preadAll(in, result, 0, dataLen);
+
+    Assert.assertEquals(dataLen, n);
+    byte[] expectedData = new byte[n];
+    System.arraycopy(data, 0, expectedData, 0, n);
+    Assert.assertArrayEquals(result, expectedData);
+  }
+
   protected OutputStream getOutputStream(int bufferSize) throws IOException {
     return getOutputStream(bufferSize, key, iv);
   }
@@ -146,7 +172,6 @@ public abstract class CryptoStreamsTestBase {
     // EOF
     n = in.read(result, 0, dataLen);
     Assert.assertEquals(n, -1);
-    in.close();
   }
   
   /** Test crypto writing with different buffer size. */
@@ -730,4 +755,47 @@ public abstract class CryptoStreamsTestBase {
     
     in.close();
   }
+
+  /** Test unbuffer. */
+  @Test(timeout=120000)
+  public void testUnbuffer() throws Exception {
+    OutputStream out = getOutputStream(smallBufferSize);
+    writeData(out);
+
+    // Test buffered read
+    try (InputStream in = getInputStream(smallBufferSize)) {
+      // Test unbuffer after buffered read
+      readCheck(in);
+      ((CanUnbuffer) in).unbuffer();
+
+      if (in instanceof Seekable) {
+        // Test buffered read again after unbuffer
+        // Must seek to the beginning first
+        ((Seekable) in).seek(0);
+        readCheck(in);
+      }
+
+      // Test close after unbuffer
+      ((CanUnbuffer) in).unbuffer();
+      // The close will be called when exiting this try-with-resource block
+    }
+
+    // Test pread
+    try (InputStream in = getInputStream(smallBufferSize)) {
+      if (in instanceof PositionedReadable) {
+        PositionedReadable pin = (PositionedReadable) in;
+
+        // Test unbuffer after pread
+        preadCheck(pin);
+        ((CanUnbuffer) in).unbuffer();
+
+        // Test pread again after unbuffer
+        preadCheck(pin);
+
+        // Test close after unbuffer
+        ((CanUnbuffer) in).unbuffer();
+        // The close will be called when exiting this try-with-resource block
+      }
+    }
+  }
 }

+ 24 - 4
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java

@@ -29,11 +29,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
 import org.apache.hadoop.fs.HasFileDescriptor;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -164,16 +166,18 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
     }
   }
   
-  public static class FakeInputStream extends InputStream implements 
-      Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, 
-      CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess {
+  static class FakeInputStream extends InputStream
+      implements Seekable, PositionedReadable, ByteBufferReadable,
+                 HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
+                 HasEnhancedByteBufferAccess, CanUnbuffer,
+                 StreamCapabilities {
     private final byte[] oneByteBuf = new byte[1];
     private int pos = 0;
     private final byte[] data;
     private final int length;
     private boolean closed = false;
 
-    public FakeInputStream(DataInputBuffer in) {
+    FakeInputStream(DataInputBuffer in) {
       data = in.getData();
       length = in.getLength();
     }
@@ -354,6 +358,22 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
         UnsupportedOperationException {
     }
 
+    @Override
+    public void unbuffer() {
+    }
+
+    @Override
+    public boolean hasCapability(String capability) {
+      switch (capability.toLowerCase()) {
+      case StreamCapabilities.READAHEAD:
+      case StreamCapabilities.DROPBEHIND:
+      case StreamCapabilities.UNBUFFER:
+        return true;
+      default:
+        return false;
+      }
+    }
+
     @Override
     public FileDescriptor getFileDescriptor() throws IOException {
       return null;

+ 5 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java

@@ -112,4 +112,9 @@ public class TestCryptoStreamsForLocalFS extends CryptoStreamsTestBase {
   @Test(timeout=10000)
   public void testSeekToNewSource() throws Exception {
   }
+
+  @Ignore("Local file input stream does not support unbuffer")
+  @Override
+  @Test
+  public void testUnbuffer() throws Exception {}
 }

+ 5 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java

@@ -120,4 +120,9 @@ public class TestCryptoStreamsNormal extends CryptoStreamsTestBase {
   @Override
   @Test(timeout=10000)
   public void testHasEnhancedByteBufferAccess() throws IOException {}
+
+  @Ignore("ByteArrayInputStream does not support unbuffer")
+  @Override
+  @Test
+  public void testUnbuffer() throws Exception {}
 }