|
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
import com.google.common.base.Preconditions;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
+import org.apache.hadoop.fs.ByteBufferPositionedReadable;
|
|
|
import org.apache.hadoop.fs.ByteBufferReadable;
|
|
|
import org.apache.hadoop.fs.CanSetDropBehind;
|
|
|
import org.apache.hadoop.fs.CanSetReadahead;
|
|
@@ -64,7 +65,8 @@ import org.apache.hadoop.util.StringUtils;
|
|
|
public class CryptoInputStream extends FilterInputStream implements
|
|
|
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
|
|
|
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess,
|
|
|
- ReadableByteChannel, CanUnbuffer, StreamCapabilities {
|
|
|
+ ReadableByteChannel, CanUnbuffer, StreamCapabilities,
|
|
|
+ ByteBufferPositionedReadable {
|
|
|
private final byte[] oneByteBuf = new byte[1];
|
|
|
private final CryptoCodec codec;
|
|
|
private final Decryptor decryptor;
|
|
@@ -327,19 +329,39 @@ public class CryptoInputStream extends FilterInputStream implements
|
|
|
public int read(long position, byte[] buffer, int offset, int length)
|
|
|
throws IOException {
|
|
|
checkStream();
|
|
|
- try {
|
|
|
- final int n = ((PositionedReadable) in).read(position, buffer, offset,
|
|
|
- length);
|
|
|
- if (n > 0) {
|
|
|
- // This operation does not change the current offset of the file
|
|
|
- decrypt(position, buffer, offset, n);
|
|
|
- }
|
|
|
-
|
|
|
- return n;
|
|
|
- } catch (ClassCastException e) {
|
|
|
+ if (!(in instanceof PositionedReadable)) {
|
|
|
throw new UnsupportedOperationException("This stream does not support " +
|
|
|
"positioned read.");
|
|
|
}
|
|
|
+ final int n = ((PositionedReadable) in).read(position, buffer, offset,
|
|
|
+ length);
|
|
|
+ if (n > 0) {
|
|
|
+ // This operation does not change the current offset of the file
|
|
|
+ decrypt(position, buffer, offset, n);
|
|
|
+ }
|
|
|
+
|
|
|
+ return n;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Positioned read using {@link ByteBuffer}s. This method is thread-safe.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public int read(long position, final ByteBuffer buf)
|
|
|
+ throws IOException {
|
|
|
+ checkStream();
|
|
|
+ if (!(in instanceof ByteBufferPositionedReadable)) {
|
|
|
+ throw new UnsupportedOperationException("This stream does not support " +
|
|
|
+ "positioned reads with byte buffers.");
|
|
|
+ }
|
|
|
+ int bufPos = buf.position();
|
|
|
+ final int n = ((ByteBufferPositionedReadable) in).read(position, buf);
|
|
|
+ if (n > 0) {
|
|
|
+ // This operation does not change the current offset of the file
|
|
|
+ decrypt(position, buf, n, bufPos);
|
|
|
+ }
|
|
|
+
|
|
|
+ return n;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -348,49 +370,124 @@ public class CryptoInputStream extends FilterInputStream implements
|
|
|
*/
|
|
|
private void decrypt(long position, byte[] buffer, int offset, int length)
|
|
|
throws IOException {
|
|
|
- ByteBuffer inBuffer = getBuffer();
|
|
|
- ByteBuffer outBuffer = getBuffer();
|
|
|
+ ByteBuffer localInBuffer = null;
|
|
|
+ ByteBuffer localOutBuffer = null;
|
|
|
Decryptor decryptor = null;
|
|
|
try {
|
|
|
+ localInBuffer = getBuffer();
|
|
|
+ localOutBuffer = getBuffer();
|
|
|
decryptor = getDecryptor();
|
|
|
byte[] iv = initIV.clone();
|
|
|
updateDecryptor(decryptor, position, iv);
|
|
|
byte padding = getPadding(position);
|
|
|
- inBuffer.position(padding); // Set proper position for input data.
|
|
|
+ localInBuffer.position(padding); // Set proper position for input data.
|
|
|
|
|
|
int n = 0;
|
|
|
while (n < length) {
|
|
|
- int toDecrypt = Math.min(length - n, inBuffer.remaining());
|
|
|
- inBuffer.put(buffer, offset + n, toDecrypt);
|
|
|
+ int toDecrypt = Math.min(length - n, localInBuffer.remaining());
|
|
|
+ localInBuffer.put(buffer, offset + n, toDecrypt);
|
|
|
// Do decryption
|
|
|
- decrypt(decryptor, inBuffer, outBuffer, padding);
|
|
|
+ decrypt(decryptor, localInBuffer, localOutBuffer, padding);
|
|
|
|
|
|
- outBuffer.get(buffer, offset + n, toDecrypt);
|
|
|
+ localOutBuffer.get(buffer, offset + n, toDecrypt);
|
|
|
n += toDecrypt;
|
|
|
- padding = afterDecryption(decryptor, inBuffer, position + n, iv);
|
|
|
+ padding = afterDecryption(decryptor, localInBuffer, position + n, iv);
|
|
|
}
|
|
|
} finally {
|
|
|
- returnBuffer(inBuffer);
|
|
|
- returnBuffer(outBuffer);
|
|
|
+ returnBuffer(localInBuffer);
|
|
|
+ returnBuffer(localOutBuffer);
|
|
|
returnDecryptor(decryptor);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Decrypts the given {@link ByteBuffer} in place. {@code length} bytes are
|
|
|
+ * decrypted from {@code buf} starting at {@code start}.
|
|
|
+ * {@code buf.position()} and {@code buf.limit()} are unchanged after this
|
|
|
+ * method returns. This method is thread-safe.
|
|
|
+ *
|
|
|
+ * <p>
|
|
|
+ * This method decrypts the input buf chunk-by-chunk and writes the
|
|
|
+ * decrypted output back into the input buf. It uses two local buffers
|
|
|
+ * taken from the {@link #bufferPool} to assist in this process: one is
|
|
|
+ * designated as the input buffer and it stores a single chunk of the
|
|
|
+ * given buf, the other is designated as the output buffer, which stores
|
|
|
+ * the output of decrypting the input buffer. Both buffers are of size
|
|
|
+ * {@link #bufferSize}.
|
|
|
+ * </p>
|
|
|
+ *
|
|
|
+ * <p>
|
|
|
+ * Decryption is done by using a {@link Decryptor} and the
|
|
|
+ * {@link #decrypt(Decryptor, ByteBuffer, ByteBuffer, byte)} method. Once
|
|
|
+ * the decrypted data is written into the output buffer, is is copied back
|
|
|
+ * into buf. Both buffers are returned back into the pool once the entire
|
|
|
+ * buf is decrypted.
|
|
|
+ * </p>
|
|
|
+ *
|
|
|
+ * @param filePosition the current position of the file being read
|
|
|
+ * @param buf the {@link ByteBuffer} to decrypt
|
|
|
+ * @param length the number of bytes in {@code buf} to decrypt
|
|
|
+ * @param start the position in {@code buf} to start decrypting data from
|
|
|
+ */
|
|
|
+ private void decrypt(long filePosition, ByteBuffer buf, int length, int start)
|
|
|
+ throws IOException {
|
|
|
+ ByteBuffer localInBuffer = null;
|
|
|
+ ByteBuffer localOutBuffer = null;
|
|
|
+
|
|
|
+ // Duplicate the buffer so we don't have to worry about resetting the
|
|
|
+ // original position and limit at the end of the method
|
|
|
+ buf = buf.duplicate();
|
|
|
+
|
|
|
+ int decryptedBytes = 0;
|
|
|
+ Decryptor localDecryptor = null;
|
|
|
+ try {
|
|
|
+ localInBuffer = getBuffer();
|
|
|
+ localOutBuffer = getBuffer();
|
|
|
+ localDecryptor = getDecryptor();
|
|
|
+ byte[] localIV = initIV.clone();
|
|
|
+ updateDecryptor(localDecryptor, filePosition, localIV);
|
|
|
+ byte localPadding = getPadding(filePosition);
|
|
|
+ // Set proper filePosition for inputdata.
|
|
|
+ localInBuffer.position(localPadding);
|
|
|
+
|
|
|
+ while (decryptedBytes < length) {
|
|
|
+ buf.position(start + decryptedBytes);
|
|
|
+ buf.limit(start + decryptedBytes +
|
|
|
+ Math.min(length - decryptedBytes, localInBuffer.remaining()));
|
|
|
+ localInBuffer.put(buf);
|
|
|
+ // Do decryption
|
|
|
+ try {
|
|
|
+ decrypt(localDecryptor, localInBuffer, localOutBuffer, localPadding);
|
|
|
+ buf.position(start + decryptedBytes);
|
|
|
+ buf.limit(start + length);
|
|
|
+ decryptedBytes += localOutBuffer.remaining();
|
|
|
+ buf.put(localOutBuffer);
|
|
|
+ } finally {
|
|
|
+ localPadding = afterDecryption(localDecryptor, localInBuffer,
|
|
|
+ filePosition + length, localIV);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ returnBuffer(localInBuffer);
|
|
|
+ returnBuffer(localOutBuffer);
|
|
|
+ returnDecryptor(localDecryptor);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/** Positioned read fully. It is thread-safe */
|
|
|
@Override
|
|
|
public void readFully(long position, byte[] buffer, int offset, int length)
|
|
|
throws IOException {
|
|
|
checkStream();
|
|
|
- try {
|
|
|
- ((PositionedReadable) in).readFully(position, buffer, offset, length);
|
|
|
- if (length > 0) {
|
|
|
- // This operation does not change the current offset of the file
|
|
|
- decrypt(position, buffer, offset, length);
|
|
|
- }
|
|
|
- } catch (ClassCastException e) {
|
|
|
+ if (!(in instanceof PositionedReadable)) {
|
|
|
throw new UnsupportedOperationException("This stream does not support " +
|
|
|
"positioned readFully.");
|
|
|
}
|
|
|
+ ((PositionedReadable) in).readFully(position, buffer, offset, length);
|
|
|
+ if (length > 0) {
|
|
|
+ // This operation does not change the current offset of the file
|
|
|
+ decrypt(position, buffer, offset, length);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -405,23 +502,22 @@ public class CryptoInputStream extends FilterInputStream implements
|
|
|
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
|
|
|
}
|
|
|
checkStream();
|
|
|
- try {
|
|
|
- /*
|
|
|
- * If data of target pos in the underlying stream has already been read
|
|
|
- * and decrypted in outBuffer, we just need to re-position outBuffer.
|
|
|
- */
|
|
|
- if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
|
|
|
- int forward = (int) (pos - (streamOffset - outBuffer.remaining()));
|
|
|
- if (forward > 0) {
|
|
|
- outBuffer.position(outBuffer.position() + forward);
|
|
|
- }
|
|
|
- } else {
|
|
|
- ((Seekable) in).seek(pos);
|
|
|
- resetStreamOffset(pos);
|
|
|
+ /*
|
|
|
+ * If data of target pos in the underlying stream has already been read
|
|
|
+ * and decrypted in outBuffer, we just need to re-position outBuffer.
|
|
|
+ */
|
|
|
+ if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
|
|
|
+ int forward = (int) (pos - (streamOffset - outBuffer.remaining()));
|
|
|
+ if (forward > 0) {
|
|
|
+ outBuffer.position(outBuffer.position() + forward);
|
|
|
}
|
|
|
- } catch (ClassCastException e) {
|
|
|
- throw new UnsupportedOperationException("This stream does not support " +
|
|
|
- "seek.");
|
|
|
+ } else {
|
|
|
+ if (!(in instanceof Seekable)) {
|
|
|
+ throw new UnsupportedOperationException("This stream does not " +
|
|
|
+ "support seek.");
|
|
|
+ }
|
|
|
+ ((Seekable) in).seek(pos);
|
|
|
+ resetStreamOffset(pos);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -519,31 +615,34 @@ public class CryptoInputStream extends FilterInputStream implements
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Decrypt all data in buf: total n bytes from given start position.
|
|
|
- * Output is also buf and same start position.
|
|
|
- * buf.position() and buf.limit() should be unchanged after decryption.
|
|
|
+ * Decrypts the given {@link ByteBuffer} in place. {@code length} bytes are
|
|
|
+ * decrypted from {@code buf} starting at {@code start}.
|
|
|
+ * {@code buf.position()} and {@code buf.limit()} are unchanged after this
|
|
|
+ * method returns.
|
|
|
+ *
|
|
|
+ * @see #decrypt(long, ByteBuffer, int, int)
|
|
|
*/
|
|
|
- private void decrypt(ByteBuffer buf, int n, int start)
|
|
|
+ private void decrypt(ByteBuffer buf, int length, int start)
|
|
|
throws IOException {
|
|
|
- final int pos = buf.position();
|
|
|
- final int limit = buf.limit();
|
|
|
- int len = 0;
|
|
|
- while (len < n) {
|
|
|
- buf.position(start + len);
|
|
|
- buf.limit(start + len + Math.min(n - len, inBuffer.remaining()));
|
|
|
+ buf = buf.duplicate();
|
|
|
+ int decryptedBytes = 0;
|
|
|
+ while (decryptedBytes < length) {
|
|
|
+ buf.position(start + decryptedBytes);
|
|
|
+ buf.limit(start + decryptedBytes +
|
|
|
+ Math.min(length - decryptedBytes, inBuffer.remaining()));
|
|
|
inBuffer.put(buf);
|
|
|
// Do decryption
|
|
|
try {
|
|
|
decrypt(decryptor, inBuffer, outBuffer, padding);
|
|
|
- buf.position(start + len);
|
|
|
- buf.limit(limit);
|
|
|
- len += outBuffer.remaining();
|
|
|
+ buf.position(start + decryptedBytes);
|
|
|
+ buf.limit(start + length);
|
|
|
+ decryptedBytes += outBuffer.remaining();
|
|
|
buf.put(outBuffer);
|
|
|
} finally {
|
|
|
- padding = afterDecryption(decryptor, inBuffer, streamOffset - (n - len), iv);
|
|
|
+ padding = afterDecryption(decryptor, inBuffer,
|
|
|
+ streamOffset - (length - decryptedBytes), iv);
|
|
|
}
|
|
|
}
|
|
|
- buf.position(pos);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -572,14 +671,13 @@ public class CryptoInputStream extends FilterInputStream implements
|
|
|
Preconditions.checkArgument(targetPos >= 0,
|
|
|
"Cannot seek to negative offset.");
|
|
|
checkStream();
|
|
|
- try {
|
|
|
- boolean result = ((Seekable) in).seekToNewSource(targetPos);
|
|
|
- resetStreamOffset(targetPos);
|
|
|
- return result;
|
|
|
- } catch (ClassCastException e) {
|
|
|
+ if (!(in instanceof Seekable)) {
|
|
|
throw new UnsupportedOperationException("This stream does not support " +
|
|
|
"seekToNewSource.");
|
|
|
}
|
|
|
+ boolean result = ((Seekable) in).seekToNewSource(targetPos);
|
|
|
+ resetStreamOffset(targetPos);
|
|
|
+ return result;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -587,59 +685,59 @@ public class CryptoInputStream extends FilterInputStream implements
|
|
|
EnumSet<ReadOption> opts) throws IOException,
|
|
|
UnsupportedOperationException {
|
|
|
checkStream();
|
|
|
- try {
|
|
|
- if (outBuffer.remaining() > 0) {
|
|
|
- // Have some decrypted data unread, need to reset.
|
|
|
- ((Seekable) in).seek(getPos());
|
|
|
- resetStreamOffset(getPos());
|
|
|
+ if (outBuffer.remaining() > 0) {
|
|
|
+ if (!(in instanceof Seekable)) {
|
|
|
+ throw new UnsupportedOperationException("This stream does not " +
|
|
|
+ "support seek.");
|
|
|
}
|
|
|
- final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
|
|
|
- read(bufferPool, maxLength, opts);
|
|
|
- if (buffer != null) {
|
|
|
- final int n = buffer.remaining();
|
|
|
- if (n > 0) {
|
|
|
- streamOffset += buffer.remaining(); // Read n bytes
|
|
|
- final int pos = buffer.position();
|
|
|
- decrypt(buffer, n, pos);
|
|
|
- }
|
|
|
- }
|
|
|
- return buffer;
|
|
|
- } catch (ClassCastException e) {
|
|
|
- throw new UnsupportedOperationException("This stream does not support " +
|
|
|
+ // Have some decrypted data unread, need to reset.
|
|
|
+ ((Seekable) in).seek(getPos());
|
|
|
+ resetStreamOffset(getPos());
|
|
|
+ }
|
|
|
+ if (!(in instanceof HasEnhancedByteBufferAccess)) {
|
|
|
+ throw new UnsupportedOperationException("This stream does not support " +
|
|
|
"enhanced byte buffer access.");
|
|
|
}
|
|
|
+ final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
|
|
|
+ read(bufferPool, maxLength, opts);
|
|
|
+ if (buffer != null) {
|
|
|
+ final int n = buffer.remaining();
|
|
|
+ if (n > 0) {
|
|
|
+ streamOffset += buffer.remaining(); // Read n bytes
|
|
|
+ final int pos = buffer.position();
|
|
|
+ decrypt(buffer, n, pos);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return buffer;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void releaseBuffer(ByteBuffer buffer) {
|
|
|
- try {
|
|
|
- ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
|
|
|
- } catch (ClassCastException e) {
|
|
|
+ if (!(in instanceof HasEnhancedByteBufferAccess)) {
|
|
|
throw new UnsupportedOperationException("This stream does not support " +
|
|
|
"release buffer.");
|
|
|
}
|
|
|
+ ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void setReadahead(Long readahead) throws IOException,
|
|
|
UnsupportedOperationException {
|
|
|
- try {
|
|
|
- ((CanSetReadahead) in).setReadahead(readahead);
|
|
|
- } catch (ClassCastException e) {
|
|
|
+ if (!(in instanceof CanSetReadahead)) {
|
|
|
throw new UnsupportedOperationException("This stream does not support " +
|
|
|
"setting the readahead caching strategy.");
|
|
|
}
|
|
|
+ ((CanSetReadahead) in).setReadahead(readahead);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void setDropBehind(Boolean dropCache) throws IOException,
|
|
|
UnsupportedOperationException {
|
|
|
- try {
|
|
|
- ((CanSetDropBehind) in).setDropBehind(dropCache);
|
|
|
- } catch (ClassCastException e) {
|
|
|
+ if (!(in instanceof CanSetReadahead)) {
|
|
|
throw new UnsupportedOperationException("This stream does not " +
|
|
|
"support setting the drop-behind caching setting.");
|
|
|
}
|
|
|
+ ((CanSetDropBehind) in).setDropBehind(dropCache);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -737,11 +835,17 @@ public class CryptoInputStream extends FilterInputStream implements
|
|
|
@Override
|
|
|
public boolean hasCapability(String capability) {
|
|
|
switch (StringUtils.toLowerCase(capability)) {
|
|
|
+ case StreamCapabilities.UNBUFFER:
|
|
|
+ return true;
|
|
|
case StreamCapabilities.READAHEAD:
|
|
|
case StreamCapabilities.DROPBEHIND:
|
|
|
- case StreamCapabilities.UNBUFFER:
|
|
|
case StreamCapabilities.READBYTEBUFFER:
|
|
|
- return true;
|
|
|
+ case StreamCapabilities.PREADBYTEBUFFER:
|
|
|
+ if (!(in instanceof StreamCapabilities)) {
|
|
|
+ throw new UnsupportedOperationException("This stream does not expose " +
|
|
|
+ "its stream capabilities.");
|
|
|
+ }
|
|
|
+ return ((StreamCapabilities) in).hasCapability(capability);
|
|
|
default:
|
|
|
return false;
|
|
|
}
|