Explorar el Código

HADOOP-10632. Minor improvements to Crypto input and output streams. Contributed by Yi Liu

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/fs-encryption@1598485 13f79535-47bb-0310-9956-ffa450edef68
Yi Liu hace 11 años
padre
commit
9c2848e076

+ 4 - 1
hadoop-common-project/hadoop-common/CHANGES-fs-encryption.txt

@@ -12,7 +12,10 @@ fs-encryption (Unreleased)
     interfaces. (Yi Liu and Charles Lamb)
 
     HADOOP-10628. Javadoc and few code style improvement for Crypto
-    input and output streams. (yliu via clamb)
+    input and output streams. (Yi Liu via clamb)
+
+    HADOOP-10632. Minor improvements to Crypto input and output streams. 
+    (Yi Liu)
 
   OPTIMIZATIONS
 

+ 20 - 9
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/AESCTRCryptoCodec.java

@@ -17,15 +17,12 @@
  */
 package org.apache.hadoop.crypto;
 
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.google.common.base.Preconditions;
 
-@InterfaceAudience.Public
+@InterfaceAudience.Private
 @InterfaceStability.Evolving
 public abstract class AESCTRCryptoCodec extends CryptoCodec {
   /**
@@ -33,6 +30,7 @@ public abstract class AESCTRCryptoCodec extends CryptoCodec {
    * @see http://en.wikipedia.org/wiki/Advanced_Encryption_Standard
    */
   private static final int AES_BLOCK_SIZE = 16;
+  private static final int CTR_OFFSET = 8;
 
   @Override
   public int getAlgorithmBlockSize() {
@@ -48,10 +46,23 @@ public abstract class AESCTRCryptoCodec extends CryptoCodec {
     Preconditions.checkArgument(initIV.length == AES_BLOCK_SIZE);
     Preconditions.checkArgument(IV.length == AES_BLOCK_SIZE);
     
-    final ByteBuffer buf = ByteBuffer.wrap(IV);
-    buf.put(initIV);
-    buf.order(ByteOrder.BIG_ENDIAN);
-    counter += buf.getLong(AES_BLOCK_SIZE - 8);
-    buf.putLong(AES_BLOCK_SIZE - 8, counter);
+    System.arraycopy(initIV, 0, IV, 0, CTR_OFFSET);
+    long l = (initIV[CTR_OFFSET + 0] << 56)
+        + ((initIV[CTR_OFFSET + 1] & 0xFF) << 48)
+        + ((initIV[CTR_OFFSET + 2] & 0xFF) << 40)
+        + ((initIV[CTR_OFFSET + 3] & 0xFF) << 32)
+        + ((initIV[CTR_OFFSET + 4] & 0xFF) << 24)
+        + ((initIV[CTR_OFFSET + 5] & 0xFF) << 16)
+        + ((initIV[CTR_OFFSET + 6] & 0xFF) << 8)
+        + (initIV[CTR_OFFSET + 7] & 0xFF);
+    l += counter;
+    IV[CTR_OFFSET + 0] = (byte) (l >>> 56);
+    IV[CTR_OFFSET + 1] = (byte) (l >>> 48);
+    IV[CTR_OFFSET + 2] = (byte) (l >>> 40);
+    IV[CTR_OFFSET + 3] = (byte) (l >>> 32);
+    IV[CTR_OFFSET + 4] = (byte) (l >>> 24);
+    IV[CTR_OFFSET + 5] = (byte) (l >>> 16);
+    IV[CTR_OFFSET + 6] = (byte) (l >>> 8);
+    IV[CTR_OFFSET + 7] = (byte) (l);
   }
 }

+ 6 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoCodec.java

@@ -29,7 +29,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY
 /**
  * Crypto codec class, encapsulates encryptor/decryptor pair.
  */
-@InterfaceAudience.Public
+@InterfaceAudience.Private
 @InterfaceStability.Evolving
 public abstract class CryptoCodec implements Configurable {
   
@@ -48,21 +48,21 @@ public abstract class CryptoCodec implements Configurable {
   public abstract int getAlgorithmBlockSize();
 
   /**
-   * Get an {@link #org.apache.hadoop.crypto.Encryptor}. 
+   * Create a {@link org.apache.hadoop.crypto.Encryptor}. 
    * @return Encryptor the encryptor
    */
-  public abstract Encryptor getEncryptor() throws GeneralSecurityException;
+  public abstract Encryptor createEncryptor() throws GeneralSecurityException;
   
   /**
-   * Get a {@link #org.apache.hadoop.crypto.Decryptor}.
+   * Create a {@link org.apache.hadoop.crypto.Decryptor}.
    * @return Decryptor the decryptor
    */
-  public abstract Decryptor getDecryptor() throws GeneralSecurityException;
+  public abstract Decryptor createDecryptor() throws GeneralSecurityException;
   
   /**
    * This interface is only for Counter (CTR) mode. Generally the Encryptor
    * or Decryptor calculates the IV and maintain encryption context internally. 
-   * For example a {@link #javax.crypto.Cipher} will maintain its encryption 
+   * For example a {@link javax.crypto.Cipher} will maintain its encryption 
    * context internally when we do encryption/decryption using the 
    * Cipher#update interface. 
    * <p/>

+ 155 - 100
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java

@@ -25,10 +25,11 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.security.GeneralSecurityException;
 import java.util.EnumSet;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CanSetReadahead;
@@ -38,8 +39,6 @@ import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.ByteBufferPool;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT;
 
 import com.google.common.base.Preconditions;
 
@@ -54,15 +53,15 @@ import com.google.common.base.Preconditions;
  * <p/>
  * The underlying stream offset is maintained as state.
  */
-@InterfaceAudience.Public
+@InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class CryptoInputStream extends FilterInputStream implements 
     Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, 
     CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess {
-  private static final int MIN_BUFFER_SIZE = 512;
   private static final byte[] oneByteBuf = new byte[1];
   private final CryptoCodec codec;
   private final Decryptor decryptor;
+  private final int bufferSize;
   
   /**
    * Input data buffer. The data starts at inBuffer.position() and ends at 
@@ -79,7 +78,7 @@ public class CryptoInputStream extends FilterInputStream implements
   
   /**
    * Whether the underlying stream supports 
-   * {@link #org.apache.hadoop.fs.ByteBufferReadable}
+   * {@link org.apache.hadoop.fs.ByteBufferReadable}
    */
   private Boolean usingByteBufferRead = null;
   
@@ -94,32 +93,33 @@ public class CryptoInputStream extends FilterInputStream implements
   private final byte[] initIV;
   private byte[] iv;
   
+  /** DirectBuffer pool */
+  private final Queue<ByteBuffer> bufferPool = 
+      new ConcurrentLinkedQueue<ByteBuffer>();
+  /** Decryptor pool */
+  private final Queue<Decryptor> decryptorPool = 
+      new ConcurrentLinkedQueue<Decryptor>();
+  
   public CryptoInputStream(InputStream in, CryptoCodec codec, 
       int bufferSize, byte[] key, byte[] iv) throws IOException {
     super(in);
-    Preconditions.checkArgument(bufferSize >= MIN_BUFFER_SIZE, 
-        "Minimum value of buffer size is 512.");
-    this.key = key;
-    this.initIV = iv;
-    this.iv = iv.clone();
-    inBuffer = ByteBuffer.allocateDirect(bufferSize);
-    outBuffer = ByteBuffer.allocateDirect(bufferSize);
-    outBuffer.limit(0);
+    this.bufferSize = CryptoStreamUtils.checkBufferSize(codec, bufferSize);
     this.codec = codec;
-    try {
-      decryptor = codec.getDecryptor();
-    } catch (GeneralSecurityException e) {
-      throw new IOException(e);
-    }
+    this.key = key.clone();
+    this.initIV = iv.clone();
+    this.iv = iv.clone();
+    inBuffer = ByteBuffer.allocateDirect(this.bufferSize);
+    outBuffer = ByteBuffer.allocateDirect(this.bufferSize);
+    decryptor = getDecryptor();
     if (in instanceof Seekable) {
       streamOffset = ((Seekable) in).getPos();
     }
-    updateDecryptor();
+    resetStreamOffset(streamOffset);
   }
   
   public CryptoInputStream(InputStream in, CryptoCodec codec,
       byte[] key, byte[] iv) throws IOException {
-    this(in, codec, getBufferSize(codec.getConf()), key, iv);
+    this(in, codec, CryptoStreamUtils.getBufferSize(codec.getConf()), key, iv);
   }
   
   public InputStream getWrappedStream() {
@@ -169,14 +169,14 @@ public class CryptoInputStream extends FilterInputStream implements
             usingByteBufferRead = Boolean.FALSE;
           }
         }
-        if (!usingByteBufferRead.booleanValue()) {
-          n = readFromUnderlyingStream();
+        if (!usingByteBufferRead) {
+          n = readFromUnderlyingStream(inBuffer);
         }
       } else {
-        if (usingByteBufferRead.booleanValue()) {
+        if (usingByteBufferRead) {
           n = ((ByteBufferReadable) in).read(inBuffer);
         } else {
-          n = readFromUnderlyingStream();
+          n = readFromUnderlyingStream(inBuffer);
         }
       }
       if (n <= 0) {
@@ -184,7 +184,8 @@ public class CryptoInputStream extends FilterInputStream implements
       }
       
       streamOffset += n; // Read n bytes
-      decrypt();
+      decrypt(decryptor, inBuffer, outBuffer, padding);
+      padding = afterDecryption(decryptor, inBuffer, streamOffset, iv);
       n = Math.min(len, outBuffer.remaining());
       outBuffer.get(b, off, n);
       return n;
@@ -192,7 +193,7 @@ public class CryptoInputStream extends FilterInputStream implements
   }
   
   /** Read data from underlying stream. */
-  private int readFromUnderlyingStream() throws IOException {
+  private int readFromUnderlyingStream(ByteBuffer inBuffer) throws IOException {
     final int toRead = inBuffer.remaining();
     final byte[] tmp = getTmpBuf();
     final int n = in.read(tmp, 0, toRead);
@@ -205,16 +206,18 @@ public class CryptoInputStream extends FilterInputStream implements
   private byte[] tmpBuf;
   private byte[] getTmpBuf() {
     if (tmpBuf == null) {
-      tmpBuf = new byte[inBuffer.capacity()];
+      tmpBuf = new byte[bufferSize];
     }
     return tmpBuf;
   }
   
   /**
-   * Do the decryption using {@link #inBuffer} as input and {@link #outBuffer} 
-   * as output.
+   * Do the decryption using inBuffer as input and outBuffer as output.
+   * Upon return, inBuffer is cleared; the decrypted data starts at 
+   * outBuffer.position() and ends at outBuffer.limit();
    */
-  private void decrypt() throws IOException {
+  private void decrypt(Decryptor decryptor, ByteBuffer inBuffer, 
+      ByteBuffer outBuffer, byte padding) throws IOException {
     Preconditions.checkState(inBuffer.position() >= padding);
     if(inBuffer.position() == padding) {
       // There is no real data in inBuffer.
@@ -231,8 +234,16 @@ public class CryptoInputStream extends FilterInputStream implements
        * same position.
        */
       outBuffer.position(padding);
-      padding = 0;
     }
+  }
+  
+  /**
+   * This method is executed immediately after decryption. Check whether 
+   * decryptor should be updated and recalculate padding if needed. 
+   */
+  private byte afterDecryption(Decryptor decryptor, ByteBuffer inBuffer, 
+      long position, byte[] iv) throws IOException {
+    byte padding = 0;
     if (decryptor.isContextReset()) {
       /*
        * This code is generally not executed since the decryptor usually 
@@ -240,23 +251,31 @@ public class CryptoInputStream extends FilterInputStream implements
        * some implementations can't maintain context so a re-init is necessary 
        * after each decryption call.
        */
-      updateDecryptor();
+      updateDecryptor(decryptor, position, iv);
+      padding = getPadding(position);
+      inBuffer.position(padding);
     }
+    return padding;
   }
   
-  /**
-   * Update the {@link #decryptor}. Calculate the counter and {@link #padding}.
-   */
-  private void updateDecryptor() throws IOException {
-    final long counter = streamOffset / codec.getAlgorithmBlockSize();
-    padding = (byte)(streamOffset % codec.getAlgorithmBlockSize());
-    inBuffer.position(padding); // Set proper position for input data.
+  private long getCounter(long position) {
+    return position / codec.getAlgorithmBlockSize();
+  }
+  
+  private byte getPadding(long position) {
+    return (byte)(position % codec.getAlgorithmBlockSize());
+  }
+  
+  /** Calculate the counter and iv, update the decryptor. */
+  private void updateDecryptor(Decryptor decryptor, long position, byte[] iv) 
+      throws IOException {
+    final long counter = getCounter(position);
     codec.calculateIV(initIV, counter, iv);
     decryptor.init(key, iv);
   }
   
   /**
-   * Reset the underlying stream offset, and clear {@link #inBuffer} and 
+   * Reset the underlying stream offset; clear {@link #inBuffer} and 
    * {@link #outBuffer}. This Typically happens during {@link #seek(long)} 
    * or {@link #skip(long)}.
    */
@@ -265,7 +284,9 @@ public class CryptoInputStream extends FilterInputStream implements
     inBuffer.clear();
     outBuffer.clear();
     outBuffer.limit(0);
-    updateDecryptor();
+    updateDecryptor(decryptor, offset, iv);
+    padding = getPadding(offset);
+    inBuffer.position(padding); // Set proper position for input data.
   }
   
   @Override
@@ -279,17 +300,7 @@ public class CryptoInputStream extends FilterInputStream implements
     closed = true;
   }
   
-  /** Forcibly free the direct buffer. */
-  private void freeBuffers() {
-    final sun.misc.Cleaner inBufferCleaner =
-        ((sun.nio.ch.DirectBuffer) inBuffer).cleaner();
-    inBufferCleaner.clean();
-    final sun.misc.Cleaner outBufferCleaner =
-        ((sun.nio.ch.DirectBuffer) outBuffer).cleaner();
-    outBufferCleaner.clean();
-  }
-  
-  /** Positioned read. */
+  /** Positioned read. It is thread-safe */
   @Override
   public int read(long position, byte[] buffer, int offset, int length)
       throws IOException {
@@ -298,12 +309,8 @@ public class CryptoInputStream extends FilterInputStream implements
       final int n = ((PositionedReadable) in).read(position, buffer, offset, 
           length);
       if (n > 0) {
-        /*
-         * Since this operation does not change the current offset of a file, 
-         * streamOffset should not be changed. We need to restore the decryptor 
-         * and outBuffer after decryption.
-         */
-        decrypt(position, buffer, offset, length);
+        // This operation does not change the current offset of the file
+        decrypt(position, buffer, offset, n);
       }
       
       return n;
@@ -315,39 +322,39 @@ public class CryptoInputStream extends FilterInputStream implements
   
   /**
    * Decrypt length bytes in buffer starting at offset. Output is also put 
-   * into buffer starting at offset. Restore the {@link #decryptor} and 
-   * {@link #outBuffer} after the decryption.
+   * into buffer starting at offset. It is thread-safe.
    */
   private void decrypt(long position, byte[] buffer, int offset, int length) 
       throws IOException {
-    final byte[] tmp = getTmpBuf();
-    int unread = outBuffer.remaining();
-    if (unread > 0) { // Cache outBuffer
-      outBuffer.get(tmp, 0, unread);
-    }
-    final long curOffset = streamOffset;
-    resetStreamOffset(position);
-    
-    int n = 0;
-    while (n < length) {
-      final int toDecrypt = Math.min(length - n, inBuffer.remaining());
-      inBuffer.put(buffer, offset + n, toDecrypt);
-      // Do decryption
-      decrypt();
-      outBuffer.get(buffer, offset + n, toDecrypt);
-      n += toDecrypt;
-    }
-    
-    // After decryption
-    resetStreamOffset(curOffset);
-    if (unread > 0) { // Restore outBuffer
-      outBuffer.clear();
-      outBuffer.put(tmp, 0, unread);
-      outBuffer.flip();
+    ByteBuffer inBuffer = getBuffer();
+    ByteBuffer outBuffer = getBuffer();
+    Decryptor decryptor = null;
+    try {
+      decryptor = getDecryptor();
+      byte[] iv = initIV.clone();
+      updateDecryptor(decryptor, position, iv);
+      byte padding = getPadding(position);
+      inBuffer.position(padding); // Set proper position for input data.
+      
+      int n = 0;
+      while (n < length) {
+        int toDecrypt = Math.min(length - n, inBuffer.remaining());
+        inBuffer.put(buffer, offset + n, toDecrypt);
+        // Do decryption
+        decrypt(decryptor, inBuffer, outBuffer, padding);
+        
+        outBuffer.get(buffer, offset + n, toDecrypt);
+        n += toDecrypt;
+        padding = afterDecryption(decryptor, inBuffer, position + n, iv);
+      }
+    } finally {
+      returnBuffer(inBuffer);
+      returnBuffer(outBuffer);
+      returnDecryptor(decryptor);
     }
   }
   
-  /** Positioned read fully. */
+  /** Positioned read fully. It is thread-safe */
   @Override
   public void readFully(long position, byte[] buffer, int offset, int length)
       throws IOException {
@@ -355,11 +362,7 @@ public class CryptoInputStream extends FilterInputStream implements
     try {
       ((PositionedReadable) in).readFully(position, buffer, offset, length);
       if (length > 0) {
-        /*
-         * Since this operation does not change the current offset of the file, 
-         * streamOffset should not be changed. We need to restore the decryptor 
-         * and outBuffer after decryption.
-         */
+        // This operation does not change the current offset of the file
         decrypt(position, buffer, offset, length);
       }
     } catch (ClassCastException e) {
@@ -484,12 +487,15 @@ public class CryptoInputStream extends FilterInputStream implements
       buf.limit(start + len + Math.min(n - len, inBuffer.remaining()));
       inBuffer.put(buf);
       // Do decryption
-      decrypt();
-      
-      buf.position(start + len);
-      buf.limit(limit);
-      len += outBuffer.remaining();
-      buf.put(outBuffer);
+      try {
+        decrypt(decryptor, inBuffer, outBuffer, padding);
+        buf.position(start + len);
+        buf.limit(limit);
+        len += outBuffer.remaining();
+        buf.put(outBuffer);
+      } finally {
+        padding = afterDecryption(decryptor, inBuffer, streamOffset - (n - len), iv);
+      }
     }
     buf.position(pos);
   }
@@ -612,8 +618,57 @@ public class CryptoInputStream extends FilterInputStream implements
     }
   }
   
-  private static int getBufferSize(Configuration conf) {
-    return conf.getInt(HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY, 
-        HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT);
+  /** Get direct buffer from pool */
+  private ByteBuffer getBuffer() {
+    ByteBuffer buffer = bufferPool.poll();
+    if (buffer == null) {
+      buffer = ByteBuffer.allocateDirect(bufferSize);
+    }
+    
+    return buffer;
+  }
+  
+  /** Return direct buffer to pool */
+  private void returnBuffer(ByteBuffer buf) {
+    if (buf != null) {
+      buf.clear();
+      bufferPool.add(buf);
+    }
+  }
+  
+  /** Forcibly free the direct buffers. */
+  private void freeBuffers() {
+    CryptoStreamUtils.freeDB(inBuffer);
+    CryptoStreamUtils.freeDB(outBuffer);
+    cleanBufferPool();
+  }
+  
+  /** Clean direct buffer pool */
+  private void cleanBufferPool() {
+    ByteBuffer buf;
+    while ((buf = bufferPool.poll()) != null) {
+      CryptoStreamUtils.freeDB(buf);
+    }
+  }
+  
+  /** Get decryptor from pool */
+  private Decryptor getDecryptor() throws IOException {
+    Decryptor decryptor = decryptorPool.poll();
+    if (decryptor == null) {
+      try {
+        decryptor = codec.createDecryptor();
+      } catch (GeneralSecurityException e) {
+        throw new IOException(e);
+      }
+    }
+    
+    return decryptor;
+  }
+  
+  /** Return decryptor to pool */
+  private void returnDecryptor(Decryptor decryptor) {
+    if (decryptor != null) {
+      decryptorPool.add(decryptor);
+    }
   }
 }

+ 17 - 31
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java

@@ -25,11 +25,8 @@ import java.security.GeneralSecurityException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.Syncable;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT;
 
 import com.google.common.base.Preconditions;
 
@@ -44,14 +41,14 @@ import com.google.common.base.Preconditions;
  * <p/>
  * The underlying stream offset is maintained as state.
  */
-@InterfaceAudience.Public
+@InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class CryptoOutputStream extends FilterOutputStream implements 
     Syncable, CanSetDropBehind {
-  private static final int MIN_BUFFER_SIZE = 512;
   private static final byte[] oneByteBuf = new byte[1];
   private final CryptoCodec codec;
   private final Encryptor encryptor;
+  private final int bufferSize;
   
   /**
    * Input data buffer. The data starts at inBuffer.position() and ends at 
@@ -86,17 +83,16 @@ public class CryptoOutputStream extends FilterOutputStream implements
       int bufferSize, byte[] key, byte[] iv, long streamOffset) 
       throws IOException {
     super(out);
-    Preconditions.checkArgument(bufferSize >= MIN_BUFFER_SIZE, 
-        "Minimum value of buffer size is 512.");
-    this.key = key;
-    this.initIV = iv;
+    this.bufferSize = CryptoStreamUtils.checkBufferSize(codec, bufferSize);
+    this.codec = codec;
+    this.key = key.clone();
+    this.initIV = iv.clone();
     this.iv = iv.clone();
-    inBuffer = ByteBuffer.allocateDirect(bufferSize);
-    outBuffer = ByteBuffer.allocateDirect(bufferSize);
+    inBuffer = ByteBuffer.allocateDirect(this.bufferSize);
+    outBuffer = ByteBuffer.allocateDirect(this.bufferSize);
     this.streamOffset = streamOffset;
-    this.codec = codec;
     try {
-      encryptor = codec.getEncryptor();
+      encryptor = codec.createEncryptor();
     } catch (GeneralSecurityException e) {
       throw new IOException(e);
     }
@@ -110,7 +106,8 @@ public class CryptoOutputStream extends FilterOutputStream implements
   
   public CryptoOutputStream(OutputStream out, CryptoCodec codec, 
       byte[] key, byte[] iv, long streamOffset) throws IOException {
-    this(out, codec, getBufferSize(codec.getConf()), key, iv, streamOffset);
+    this(out, codec, CryptoStreamUtils.getBufferSize(codec.getConf()), 
+        key, iv, streamOffset);
   }
   
   public OutputStream getWrappedStream() {
@@ -195,9 +192,7 @@ public class CryptoOutputStream extends FilterOutputStream implements
     }
   }
   
-  /**
-   * Update the {@link #encryptor}: calculate counter and {@link #padding}.
-   */
+  /** Update the {@link #encryptor}: calculate counter and {@link #padding}. */
   private void updateEncryptor() throws IOException {
     final long counter = streamOffset / codec.getAlgorithmBlockSize();
     padding = (byte)(streamOffset % codec.getAlgorithmBlockSize());
@@ -209,7 +204,7 @@ public class CryptoOutputStream extends FilterOutputStream implements
   private byte[] tmpBuf;
   private byte[] getTmpBuf() {
     if (tmpBuf == null) {
-      tmpBuf = new byte[outBuffer.capacity()];
+      tmpBuf = new byte[bufferSize];
     }
     return tmpBuf;
   }
@@ -225,16 +220,6 @@ public class CryptoOutputStream extends FilterOutputStream implements
     closed = true;
   }
   
-  /** Forcibly free the direct buffer. */
-  private void freeBuffers() {
-    final sun.misc.Cleaner inBufferCleaner =
-        ((sun.nio.ch.DirectBuffer) inBuffer).cleaner();
-    inBufferCleaner.clean();
-    final sun.misc.Cleaner outBufferCleaner =
-        ((sun.nio.ch.DirectBuffer) outBuffer).cleaner();
-    outBufferCleaner.clean();
-  }
-  
   /**
    * To flush, we need to encrypt the data in the buffer and write to the 
    * underlying stream, then do the flush.
@@ -285,8 +270,9 @@ public class CryptoOutputStream extends FilterOutputStream implements
     }
   }
   
-  private static int getBufferSize(Configuration conf) {
-    return conf.getInt(HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY, 
-        HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT);
+  /** Forcibly free the direct buffers. */
+  private void freeBuffers() {
+    CryptoStreamUtils.freeDB(inBuffer);
+    CryptoStreamUtils.freeDB(outBuffer);
   }
 }

+ 55 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoStreamUtils.java

@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+@InterfaceAudience.Private
+public class CryptoStreamUtils {
+  private static final int MIN_BUFFER_SIZE = 512;
+  
+  /** Forcibly free the direct buffer. */
+  public static void freeDB(ByteBuffer buffer) {
+    if (buffer instanceof sun.nio.ch.DirectBuffer) {
+      final sun.misc.Cleaner bufferCleaner =
+          ((sun.nio.ch.DirectBuffer) buffer).cleaner();
+      bufferCleaner.clean();
+    }
+  }
+  
+  /** Read crypto buffer size */
+  public static int getBufferSize(Configuration conf) {
+    return conf.getInt(HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY, 
+        HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT);
+  }
+  
+  /** Check and floor buffer size */
+  public static int checkBufferSize(CryptoCodec codec, int bufferSize) {
+    Preconditions.checkArgument(bufferSize >= MIN_BUFFER_SIZE, 
+        "Minimum value of buffer size is " + MIN_BUFFER_SIZE + ".");
+    return bufferSize - bufferSize % codec.getAlgorithmBlockSize();
+  }
+}

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Decryptor.java

@@ -23,7 +23,7 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
-@InterfaceAudience.Public
+@InterfaceAudience.Private
 @InterfaceStability.Evolving
 public interface Decryptor {
   

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Encryptor.java

@@ -23,7 +23,7 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
-@InterfaceAudience.Public
+@InterfaceAudience.Private
 @InterfaceStability.Evolving
 public interface Encryptor {
   

+ 89 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTRCryptoCodec.java

@@ -17,14 +17,25 @@
  */
 package org.apache.hadoop.crypto;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.security.GeneralSecurityException;
 
+import javax.crypto.Cipher;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_JCE_PROVIDER_KEY;
 
 /**
  * Implement the AES-CTR crypto codec using JCE provider.
  */
+@InterfaceAudience.Private
 public class JCEAESCTRCryptoCodec extends AESCTRCryptoCodec {
   private Configuration conf;
   private String provider;
@@ -44,12 +55,86 @@ public class JCEAESCTRCryptoCodec extends AESCTRCryptoCodec {
   }
 
   @Override
-  public Encryptor getEncryptor() throws GeneralSecurityException {
-    return new JCEAESCTREncryptor(provider);
+  public Encryptor createEncryptor() throws GeneralSecurityException {
+    return new JCEAESCTRCipher(Cipher.ENCRYPT_MODE, provider);
   }
 
   @Override
-  public Decryptor getDecryptor() throws GeneralSecurityException {
-    return new JCEAESCTRDecryptor(provider);
+  public Decryptor createDecryptor() throws GeneralSecurityException {
+    return new JCEAESCTRCipher(Cipher.DECRYPT_MODE, provider);
+  }
+  
+  private static class JCEAESCTRCipher implements Encryptor, Decryptor {
+    private final Cipher cipher;
+    private final int mode;
+    private boolean contextReset = false;
+    
+    public JCEAESCTRCipher(int mode, String provider) 
+        throws GeneralSecurityException {
+      this.mode = mode;
+      if (provider == null || provider.isEmpty()) {
+        cipher = Cipher.getInstance("AES/CTR/NoPadding");
+      } else {
+        cipher = Cipher.getInstance("AES/CTR/NoPadding", provider);
+      }
+    }
+
+    @Override
+    public void init(byte[] key, byte[] iv) throws IOException {
+      Preconditions.checkNotNull(key);
+      Preconditions.checkNotNull(iv);
+      contextReset = false;
+      try {
+        cipher.init(mode, new SecretKeySpec(key, "AES"), 
+            new IvParameterSpec(iv));
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+
+    /**
+     * AES-CTR will consume all of the input data. It requires enough space in 
+     * the destination buffer to encrypt entire input buffer.
+     */
+    @Override
+    public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer)
+        throws IOException {
+      process(inBuffer, outBuffer);
+    }
+    
+    /**
+     * AES-CTR will consume all of the input data. It requires enough space in
+     * the destination buffer to decrypt entire input buffer.
+     */
+    @Override
+    public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer)
+        throws IOException {
+      process(inBuffer, outBuffer);
+    }
+    
+    private void process(ByteBuffer inBuffer, ByteBuffer outBuffer)
+        throws IOException {
+      try {
+        int inputSize = inBuffer.remaining();
+        // Cipher#update will maintain crypto context.
+        int n = cipher.update(inBuffer, outBuffer);
+        if (n < inputSize) {
+          /**
+           * Typically code will not get here. Cipher#update will consume all 
+           * input data and put result in outBuffer. 
+           * Cipher#doFinal will reset the crypto context.
+           */
+          contextReset = true;
+          cipher.doFinal(inBuffer, outBuffer);
+        }
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+    
+    @Override
+    public boolean isContextReset() {
+      return contextReset;
+    }
   }
 }

+ 0 - 84
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTRDecryptor.java

@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.crypto;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.GeneralSecurityException;
-
-import javax.crypto.Cipher;
-import javax.crypto.spec.IvParameterSpec;
-import javax.crypto.spec.SecretKeySpec;
-
-import com.google.common.base.Preconditions;
-
-public class JCEAESCTRDecryptor implements Decryptor {
-  private final Cipher cipher;
-  private boolean contextReset = false;
-  
-  public JCEAESCTRDecryptor(String provider) throws GeneralSecurityException {
-    if (provider == null || provider.isEmpty()) {
-      cipher = Cipher.getInstance("AES/CTR/NoPadding");
-    } else {
-      cipher = Cipher.getInstance("AES/CTR/NoPadding", provider);
-    }
-  }
-
-  @Override
-  public void init(byte[] key, byte[] iv) throws IOException {
-    Preconditions.checkNotNull(key);
-    Preconditions.checkNotNull(iv);
-    contextReset = false;
-    try {
-      cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(key, "AES"), 
-          new IvParameterSpec(iv));
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  /**
-   * AES-CTR will consume all of the input data. It requires enough space in
-   * the destination buffer to decrypt entire input buffer.
-   */
-  @Override
-  public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer)
-      throws IOException {
-    try {
-      int inputSize = inBuffer.remaining();
-      // Cipher#update will maintain decryption context.
-      int n = cipher.update(inBuffer, outBuffer);
-      if (n < inputSize) {
-        /**
-         * Typically code will not get here. Cipher#update will decrypt all 
-         * input data and put result in outBuffer. 
-         * Cipher#doFinal will reset the decryption context.
-         */
-        contextReset = true;
-        cipher.doFinal(inBuffer, outBuffer);
-      }
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public boolean isContextReset() {
-    return contextReset;
-  }
-}

+ 0 - 84
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTREncryptor.java

@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.crypto;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.GeneralSecurityException;
-
-import javax.crypto.Cipher;
-import javax.crypto.spec.IvParameterSpec;
-import javax.crypto.spec.SecretKeySpec;
-
-import com.google.common.base.Preconditions;
-
-public class JCEAESCTREncryptor implements Encryptor {
-  private final Cipher cipher;
-  private boolean contextReset = false;
-  
-  public JCEAESCTREncryptor(String provider) throws GeneralSecurityException {
-    if (provider == null || provider.isEmpty()) {
-      cipher = Cipher.getInstance("AES/CTR/NoPadding");
-    } else {
-      cipher = Cipher.getInstance("AES/CTR/NoPadding", provider);
-    }
-  }
-
-  @Override
-  public void init(byte[] key, byte[] iv) throws IOException {
-    Preconditions.checkNotNull(key);
-    Preconditions.checkNotNull(iv);
-    contextReset = false;
-    try {
-      cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(key, "AES"), 
-          new IvParameterSpec(iv));
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  /**
-   * AES-CTR will consume all of the input data. It requires enough space in 
-   * the destination buffer to encrypt entire input buffer.
-   */
-  @Override
-  public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer)
-      throws IOException {
-    try {
-      int inputSize = inBuffer.remaining();
-      // Cipher#update will maintain encryption context.
-      int n = cipher.update(inBuffer, outBuffer);
-      if (n < inputSize) {
-        /**
-         * Typically code will not get here. Cipher#update will encrypt all 
-         * input data and put result in outBuffer. 
-         * Cipher#doFinal will reset the encryption context.
-         */
-        contextReset = true;
-        cipher.doFinal(inBuffer, outBuffer);
-      }
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public boolean isContextReset() {
-    return contextReset;
-  }
-}