|
@@ -24,10 +24,8 @@ import java.io.ByteArrayOutputStream;
|
|
|
import java.io.Closeable;
|
|
|
import java.io.EOFException;
|
|
|
import java.io.File;
|
|
|
-import java.io.FileInputStream;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.FileOutputStream;
|
|
|
-import java.io.FilterInputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
import java.nio.ByteBuffer;
|
|
@@ -42,10 +40,11 @@ import org.apache.hadoop.fs.FSExceptionMessages;
|
|
|
import org.apache.hadoop.util.DirectBufferPool;
|
|
|
|
|
|
import static org.apache.hadoop.fs.s3a.S3ADataBlocks.DataBlock.DestState.*;
|
|
|
+import static org.apache.hadoop.fs.s3a.S3AUtils.closeAll;
|
|
|
|
|
|
/**
|
|
|
* Set of classes to support output streaming into blocks which are then
|
|
|
- * uploaded as partitions.
|
|
|
+ * uploaded as to S3 as a single PUT, or as part of a multipart request.
|
|
|
*/
|
|
|
final class S3ADataBlocks {
|
|
|
|
|
@@ -96,6 +95,70 @@ final class S3ADataBlocks {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * The output information for an upload.
|
|
|
+ * It can be one of a file or an input stream.
|
|
|
+ * When closed, any stream is closed. Any source file is untouched.
|
|
|
+ */
|
|
|
+ static final class BlockUploadData implements Closeable {
|
|
|
+ private final File file;
|
|
|
+ private final InputStream uploadStream;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * File constructor; input stream will be null.
|
|
|
+ * @param file file to upload
|
|
|
+ */
|
|
|
+ BlockUploadData(File file) {
|
|
|
+ Preconditions.checkArgument(file.exists(), "No file: " + file);
|
|
|
+ this.file = file;
|
|
|
+ this.uploadStream = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Stream constructor, file field will be null.
|
|
|
+ * @param uploadStream stream to upload
|
|
|
+ */
|
|
|
+ BlockUploadData(InputStream uploadStream) {
|
|
|
+ Preconditions.checkNotNull(uploadStream, "rawUploadStream");
|
|
|
+ this.uploadStream = uploadStream;
|
|
|
+ this.file = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Predicate: does this instance contain a file reference.
|
|
|
+ * @return true if there is a file.
|
|
|
+ */
|
|
|
+ boolean hasFile() {
|
|
|
+ return file != null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the file, if there is one.
|
|
|
+ * @return the file for uploading, or null.
|
|
|
+ */
|
|
|
+ File getFile() {
|
|
|
+ return file;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the raw upload stream, if the object was
|
|
|
+ * created with one.
|
|
|
+ * @return the upload stream or null.
|
|
|
+ */
|
|
|
+ InputStream getUploadStream() {
|
|
|
+ return uploadStream;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Close: closes any upload stream provided in the constructor.
|
|
|
+ * @throws IOException inherited exception
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+ closeAll(LOG, uploadStream);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Base class for block factories.
|
|
|
*/
|
|
@@ -110,15 +173,21 @@ final class S3ADataBlocks {
|
|
|
|
|
|
/**
|
|
|
* Create a block.
|
|
|
+ *
|
|
|
+ * @param index index of block
|
|
|
* @param limit limit of the block.
|
|
|
+ * @param statistics stats to work with
|
|
|
* @return a new block.
|
|
|
*/
|
|
|
- abstract DataBlock create(int limit) throws IOException;
|
|
|
+ abstract DataBlock create(long index, int limit,
|
|
|
+ S3AInstrumentation.OutputStreamStatistics statistics)
|
|
|
+ throws IOException;
|
|
|
|
|
|
/**
|
|
|
* Implement any close/cleanup operation.
|
|
|
* Base class is a no-op
|
|
|
- * @throws IOException -ideally, it shouldn't.
|
|
|
+ * @throws IOException Inherited exception; implementations should
|
|
|
+ * avoid raising it.
|
|
|
*/
|
|
|
@Override
|
|
|
public void close() throws IOException {
|
|
@@ -140,6 +209,14 @@ final class S3ADataBlocks {
|
|
|
enum DestState {Writing, Upload, Closed}
|
|
|
|
|
|
private volatile DestState state = Writing;
|
|
|
+ protected final long index;
|
|
|
+ protected final S3AInstrumentation.OutputStreamStatistics statistics;
|
|
|
+
|
|
|
+ protected DataBlock(long index,
|
|
|
+ S3AInstrumentation.OutputStreamStatistics statistics) {
|
|
|
+ this.index = index;
|
|
|
+ this.statistics = statistics;
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Atomically enter a state, verifying current state.
|
|
@@ -243,8 +320,8 @@ final class S3ADataBlocks {
|
|
|
* @return the stream
|
|
|
* @throws IOException trouble
|
|
|
*/
|
|
|
- InputStream startUpload() throws IOException {
|
|
|
- LOG.debug("Start datablock upload");
|
|
|
+ BlockUploadData startUpload() throws IOException {
|
|
|
+ LOG.debug("Start datablock[{}] upload", index);
|
|
|
enterState(Writing, Upload);
|
|
|
return null;
|
|
|
}
|
|
@@ -278,6 +355,23 @@ final class S3ADataBlocks {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * A block has been allocated.
|
|
|
+ */
|
|
|
+ protected void blockAllocated() {
|
|
|
+ if (statistics != null) {
|
|
|
+ statistics.blockAllocated();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A block has been released.
|
|
|
+ */
|
|
|
+ protected void blockReleased() {
|
|
|
+ if (statistics != null) {
|
|
|
+ statistics.blockReleased();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// ====================================================================
|
|
@@ -292,8 +386,10 @@ final class S3ADataBlocks {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- DataBlock create(int limit) throws IOException {
|
|
|
- return new ByteArrayBlock(limit);
|
|
|
+ DataBlock create(long index, int limit,
|
|
|
+ S3AInstrumentation.OutputStreamStatistics statistics)
|
|
|
+ throws IOException {
|
|
|
+ return new ByteArrayBlock(0, limit, statistics);
|
|
|
}
|
|
|
|
|
|
}
|
|
@@ -334,9 +430,13 @@ final class S3ADataBlocks {
|
|
|
// cache data size so that it is consistent after the buffer is reset.
|
|
|
private Integer dataSize;
|
|
|
|
|
|
- ByteArrayBlock(int limit) {
|
|
|
+ ByteArrayBlock(long index,
|
|
|
+ int limit,
|
|
|
+ S3AInstrumentation.OutputStreamStatistics statistics) {
|
|
|
+ super(index, statistics);
|
|
|
this.limit = limit;
|
|
|
buffer = new S3AByteArrayOutputStream(limit);
|
|
|
+ blockAllocated();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -349,12 +449,12 @@ final class S3ADataBlocks {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- InputStream startUpload() throws IOException {
|
|
|
+ BlockUploadData startUpload() throws IOException {
|
|
|
super.startUpload();
|
|
|
dataSize = buffer.size();
|
|
|
ByteArrayInputStream bufferData = buffer.getInputStream();
|
|
|
buffer = null;
|
|
|
- return bufferData;
|
|
|
+ return new BlockUploadData(bufferData);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -378,12 +478,14 @@ final class S3ADataBlocks {
|
|
|
@Override
|
|
|
protected void innerClose() {
|
|
|
buffer = null;
|
|
|
+ blockReleased();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
- return "ByteArrayBlock{" +
|
|
|
- "state=" + getState() +
|
|
|
+ return "ByteArrayBlock{"
|
|
|
+ +"index=" + index +
|
|
|
+ ", state=" + getState() +
|
|
|
", limit=" + limit +
|
|
|
", dataSize=" + dataSize +
|
|
|
'}';
|
|
@@ -395,12 +497,6 @@ final class S3ADataBlocks {
|
|
|
/**
|
|
|
* Stream via Direct ByteBuffers; these are allocated off heap
|
|
|
* via {@link DirectBufferPool}.
|
|
|
- * This is actually the most complex of all the block factories,
|
|
|
- * due to the need to explicitly recycle buffers; in comparison, the
|
|
|
- * {@link DiskBlock} buffer delegates the work of deleting files to
|
|
|
- * the {@link DiskBlock.FileDeletingInputStream}. Here the
|
|
|
- * input stream {@link ByteBufferInputStream} has a similar task, along
|
|
|
- * with the foundational work of streaming data from a byte array.
|
|
|
*/
|
|
|
|
|
|
static class ByteBufferBlockFactory extends BlockFactory {
|
|
@@ -413,8 +509,10 @@ final class S3ADataBlocks {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- ByteBufferBlock create(int limit) throws IOException {
|
|
|
- return new ByteBufferBlock(limit);
|
|
|
+ ByteBufferBlock create(long index, int limit,
|
|
|
+ S3AInstrumentation.OutputStreamStatistics statistics)
|
|
|
+ throws IOException {
|
|
|
+ return new ByteBufferBlock(index, limit, statistics);
|
|
|
}
|
|
|
|
|
|
private ByteBuffer requestBuffer(int limit) {
|
|
@@ -446,21 +544,27 @@ final class S3ADataBlocks {
|
|
|
|
|
|
/**
|
|
|
* A DataBlock which requests a buffer from pool on creation; returns
|
|
|
- * it when the output stream is closed.
|
|
|
+ * it when it is closed.
|
|
|
*/
|
|
|
class ByteBufferBlock extends DataBlock {
|
|
|
- private ByteBuffer buffer;
|
|
|
+ private ByteBuffer blockBuffer;
|
|
|
private final int bufferSize;
|
|
|
// cache data size so that it is consistent after the buffer is reset.
|
|
|
private Integer dataSize;
|
|
|
|
|
|
/**
|
|
|
* Instantiate. This will request a ByteBuffer of the desired size.
|
|
|
+ * @param index block index
|
|
|
* @param bufferSize buffer size
|
|
|
+ * @param statistics statistics to update
|
|
|
*/
|
|
|
- ByteBufferBlock(int bufferSize) {
|
|
|
+ ByteBufferBlock(long index,
|
|
|
+ int bufferSize,
|
|
|
+ S3AInstrumentation.OutputStreamStatistics statistics) {
|
|
|
+ super(index, statistics);
|
|
|
this.bufferSize = bufferSize;
|
|
|
- buffer = requestBuffer(bufferSize);
|
|
|
+ blockBuffer = requestBuffer(bufferSize);
|
|
|
+ blockAllocated();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -473,13 +577,14 @@ final class S3ADataBlocks {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- ByteBufferInputStream startUpload() throws IOException {
|
|
|
+ BlockUploadData startUpload() throws IOException {
|
|
|
super.startUpload();
|
|
|
dataSize = bufferCapacityUsed();
|
|
|
// set the buffer up from reading from the beginning
|
|
|
- buffer.limit(buffer.position());
|
|
|
- buffer.position(0);
|
|
|
- return new ByteBufferInputStream(dataSize, buffer);
|
|
|
+ blockBuffer.limit(blockBuffer.position());
|
|
|
+ blockBuffer.position(0);
|
|
|
+ return new BlockUploadData(
|
|
|
+ new ByteBufferInputStream(dataSize, blockBuffer));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -489,182 +594,190 @@ final class S3ADataBlocks {
|
|
|
|
|
|
@Override
|
|
|
public int remainingCapacity() {
|
|
|
- return buffer != null ? buffer.remaining() : 0;
|
|
|
+ return blockBuffer != null ? blockBuffer.remaining() : 0;
|
|
|
}
|
|
|
|
|
|
private int bufferCapacityUsed() {
|
|
|
- return buffer.capacity() - buffer.remaining();
|
|
|
+ return blockBuffer.capacity() - blockBuffer.remaining();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
int write(byte[] b, int offset, int len) throws IOException {
|
|
|
super.write(b, offset, len);
|
|
|
int written = Math.min(remainingCapacity(), len);
|
|
|
- buffer.put(b, offset, written);
|
|
|
+ blockBuffer.put(b, offset, written);
|
|
|
return written;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Closing the block will release the buffer.
|
|
|
+ */
|
|
|
@Override
|
|
|
protected void innerClose() {
|
|
|
- buffer = null;
|
|
|
+ if (blockBuffer != null) {
|
|
|
+ blockReleased();
|
|
|
+ releaseBuffer(blockBuffer);
|
|
|
+ blockBuffer = null;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
return "ByteBufferBlock{"
|
|
|
- + "state=" + getState() +
|
|
|
+ + "index=" + index +
|
|
|
+ ", state=" + getState() +
|
|
|
", dataSize=" + dataSize() +
|
|
|
", limit=" + bufferSize +
|
|
|
", remainingCapacity=" + remainingCapacity() +
|
|
|
'}';
|
|
|
}
|
|
|
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Provide an input stream from a byte buffer; supporting
|
|
|
- * {@link #mark(int)}, which is required to enable replay of failed
|
|
|
- * PUT attempts.
|
|
|
- * This input stream returns the buffer to the pool afterwards.
|
|
|
- */
|
|
|
- class ByteBufferInputStream extends InputStream {
|
|
|
+ /**
|
|
|
+ * Provide an input stream from a byte buffer; supporting
|
|
|
+ * {@link #mark(int)}, which is required to enable replay of failed
|
|
|
+ * PUT attempts.
|
|
|
+ */
|
|
|
+ class ByteBufferInputStream extends InputStream {
|
|
|
|
|
|
- private final int size;
|
|
|
- private ByteBuffer byteBuffer;
|
|
|
+ private final int size;
|
|
|
+ private ByteBuffer byteBuffer;
|
|
|
|
|
|
- ByteBufferInputStream(int size, ByteBuffer byteBuffer) {
|
|
|
- LOG.debug("Creating ByteBufferInputStream of size {}", size);
|
|
|
- this.size = size;
|
|
|
- this.byteBuffer = byteBuffer;
|
|
|
- }
|
|
|
+ ByteBufferInputStream(int size,
|
|
|
+ ByteBuffer byteBuffer) {
|
|
|
+ LOG.debug("Creating ByteBufferInputStream of size {}", size);
|
|
|
+ this.size = size;
|
|
|
+ this.byteBuffer = byteBuffer;
|
|
|
+ }
|
|
|
|
|
|
- /**
|
|
|
- * Return the buffer to the pool after the stream is closed.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public synchronized void close() {
|
|
|
- if (byteBuffer != null) {
|
|
|
- LOG.debug("releasing buffer");
|
|
|
- releaseBuffer(byteBuffer);
|
|
|
+ /**
|
|
|
+ * After the stream is closed, set the local reference to the byte
|
|
|
+ * buffer to null; this guarantees that future attempts to use
|
|
|
+ * stream methods will fail.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public synchronized void close() {
|
|
|
+ LOG.debug("ByteBufferInputStream.close() for {}",
|
|
|
+ ByteBufferBlock.super.toString());
|
|
|
byteBuffer = null;
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- /**
|
|
|
- * Verify that the stream is open.
|
|
|
- * @throws IOException if the stream is closed
|
|
|
- */
|
|
|
- private void verifyOpen() throws IOException {
|
|
|
- if (byteBuffer == null) {
|
|
|
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
|
|
|
+ /**
|
|
|
+ * Verify that the stream is open.
|
|
|
+ * @throws IOException if the stream is closed
|
|
|
+ */
|
|
|
+ private void verifyOpen() throws IOException {
|
|
|
+ if (byteBuffer == null) {
|
|
|
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- public synchronized int read() throws IOException {
|
|
|
- if (available() > 0) {
|
|
|
- return byteBuffer.get() & 0xFF;
|
|
|
- } else {
|
|
|
- return -1;
|
|
|
+ public synchronized int read() throws IOException {
|
|
|
+ if (available() > 0) {
|
|
|
+ return byteBuffer.get() & 0xFF;
|
|
|
+ } else {
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- @Override
|
|
|
- public synchronized long skip(long offset) throws IOException {
|
|
|
- verifyOpen();
|
|
|
- long newPos = position() + offset;
|
|
|
- if (newPos < 0) {
|
|
|
- throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
|
|
|
+ @Override
|
|
|
+ public synchronized long skip(long offset) throws IOException {
|
|
|
+ verifyOpen();
|
|
|
+ long newPos = position() + offset;
|
|
|
+ if (newPos < 0) {
|
|
|
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
|
|
|
+ }
|
|
|
+ if (newPos > size) {
|
|
|
+ throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
|
|
|
+ }
|
|
|
+ byteBuffer.position((int) newPos);
|
|
|
+ return newPos;
|
|
|
}
|
|
|
- if (newPos > size) {
|
|
|
- throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized int available() {
|
|
|
+ Preconditions.checkState(byteBuffer != null,
|
|
|
+ FSExceptionMessages.STREAM_IS_CLOSED);
|
|
|
+ return byteBuffer.remaining();
|
|
|
}
|
|
|
- byteBuffer.position((int) newPos);
|
|
|
- return newPos;
|
|
|
- }
|
|
|
|
|
|
- @Override
|
|
|
- public synchronized int available() {
|
|
|
- Preconditions.checkState(byteBuffer != null,
|
|
|
- FSExceptionMessages.STREAM_IS_CLOSED);
|
|
|
- return byteBuffer.remaining();
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * Get the current buffer position.
|
|
|
+ * @return the buffer position
|
|
|
+ */
|
|
|
+ public synchronized int position() {
|
|
|
+ return byteBuffer.position();
|
|
|
+ }
|
|
|
|
|
|
- /**
|
|
|
- * Get the current buffer position.
|
|
|
- * @return the buffer position
|
|
|
- */
|
|
|
- public synchronized int position() {
|
|
|
- return byteBuffer.position();
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * Check if there is data left.
|
|
|
+ * @return true if there is data remaining in the buffer.
|
|
|
+ */
|
|
|
+ public synchronized boolean hasRemaining() {
|
|
|
+ return byteBuffer.hasRemaining();
|
|
|
+ }
|
|
|
|
|
|
- /**
|
|
|
- * Check if there is data left.
|
|
|
- * @return true if there is data remaining in the buffer.
|
|
|
- */
|
|
|
- public synchronized boolean hasRemaining() {
|
|
|
- return byteBuffer.hasRemaining();
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public synchronized void mark(int readlimit) {
|
|
|
+ LOG.debug("mark at {}", position());
|
|
|
+ byteBuffer.mark();
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public synchronized void mark(int readlimit) {
|
|
|
- LOG.debug("mark at {}", position());
|
|
|
- byteBuffer.mark();
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public synchronized void reset() throws IOException {
|
|
|
+ LOG.debug("reset");
|
|
|
+ byteBuffer.reset();
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public synchronized void reset() throws IOException {
|
|
|
- LOG.debug("reset");
|
|
|
- byteBuffer.reset();
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public boolean markSupported() {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public boolean markSupported() {
|
|
|
- return true;
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * Read in data.
|
|
|
+ * @param b destination buffer
|
|
|
+ * @param offset offset within the buffer
|
|
|
+ * @param length length of bytes to read
|
|
|
+ * @throws EOFException if the position is negative
|
|
|
+ * @throws IndexOutOfBoundsException if there isn't space for the
|
|
|
+ * amount of data requested.
|
|
|
+ * @throws IllegalArgumentException other arguments are invalid.
|
|
|
+ */
|
|
|
+ @SuppressWarnings("NullableProblems")
|
|
|
+ public synchronized int read(byte[] b, int offset, int length)
|
|
|
+ throws IOException {
|
|
|
+ Preconditions.checkArgument(length >= 0, "length is negative");
|
|
|
+ Preconditions.checkArgument(b != null, "Null buffer");
|
|
|
+ if (b.length - offset < length) {
|
|
|
+ throw new IndexOutOfBoundsException(
|
|
|
+ FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
|
|
|
+ + ": request length =" + length
|
|
|
+ + ", with offset =" + offset
|
|
|
+ + "; buffer capacity =" + (b.length - offset));
|
|
|
+ }
|
|
|
+ verifyOpen();
|
|
|
+ if (!hasRemaining()) {
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
|
|
|
- /**
|
|
|
- * Read in data.
|
|
|
- * @param buffer destination buffer
|
|
|
- * @param offset offset within the buffer
|
|
|
- * @param length length of bytes to read
|
|
|
- * @throws EOFException if the position is negative
|
|
|
- * @throws IndexOutOfBoundsException if there isn't space for the
|
|
|
- * amount of data requested.
|
|
|
- * @throws IllegalArgumentException other arguments are invalid.
|
|
|
- */
|
|
|
- @SuppressWarnings("NullableProblems")
|
|
|
- public synchronized int read(byte[] buffer, int offset, int length)
|
|
|
- throws IOException {
|
|
|
- Preconditions.checkArgument(length >= 0, "length is negative");
|
|
|
- Preconditions.checkArgument(buffer != null, "Null buffer");
|
|
|
- if (buffer.length - offset < length) {
|
|
|
- throw new IndexOutOfBoundsException(
|
|
|
- FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
|
|
|
- + ": request length =" + length
|
|
|
- + ", with offset =" + offset
|
|
|
- + "; buffer capacity =" + (buffer.length - offset));
|
|
|
+ int toRead = Math.min(length, available());
|
|
|
+ byteBuffer.get(b, offset, toRead);
|
|
|
+ return toRead;
|
|
|
}
|
|
|
- verifyOpen();
|
|
|
- if (!hasRemaining()) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- int toRead = Math.min(length, available());
|
|
|
- byteBuffer.get(buffer, offset, toRead);
|
|
|
- return toRead;
|
|
|
- }
|
|
|
|
|
|
- @Override
|
|
|
- public String toString() {
|
|
|
- final StringBuilder sb = new StringBuilder(
|
|
|
- "ByteBufferInputStream{");
|
|
|
- sb.append("size=").append(size);
|
|
|
- ByteBuffer buffer = this.byteBuffer;
|
|
|
- if (buffer != null) {
|
|
|
- sb.append(", available=").append(buffer.remaining());
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ final StringBuilder sb = new StringBuilder(
|
|
|
+ "ByteBufferInputStream{");
|
|
|
+ sb.append("size=").append(size);
|
|
|
+ ByteBuffer buf = this.byteBuffer;
|
|
|
+ if (buf != null) {
|
|
|
+ sb.append(", available=").append(buf.remaining());
|
|
|
+ }
|
|
|
+ sb.append(", ").append(ByteBufferBlock.super.toString());
|
|
|
+ sb.append('}');
|
|
|
+ return sb.toString();
|
|
|
}
|
|
|
- sb.append('}');
|
|
|
- return sb.toString();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -681,22 +794,29 @@ final class S3ADataBlocks {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Create a temp file and a block which writes to it.
|
|
|
+ * Create a temp file and a {@link DiskBlock} instance to manage it.
|
|
|
+ *
|
|
|
+ * @param index block index
|
|
|
* @param limit limit of the block.
|
|
|
+ * @param statistics statistics to update
|
|
|
* @return the new block
|
|
|
* @throws IOException IO problems
|
|
|
*/
|
|
|
@Override
|
|
|
- DataBlock create(int limit) throws IOException {
|
|
|
+ DataBlock create(long index,
|
|
|
+ int limit,
|
|
|
+ S3AInstrumentation.OutputStreamStatistics statistics)
|
|
|
+ throws IOException {
|
|
|
File destFile = getOwner()
|
|
|
- .createTmpFileForWrite("s3ablock", limit, getOwner().getConf());
|
|
|
- return new DiskBlock(destFile, limit);
|
|
|
+ .createTmpFileForWrite(String.format("s3ablock-%04d-", index),
|
|
|
+ limit, getOwner().getConf());
|
|
|
+ return new DiskBlock(destFile, limit, index, statistics);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Stream to a file.
|
|
|
- * This will stop at the limit; the caller is expected to create a new block
|
|
|
+ * This will stop at the limit; the caller is expected to create a new block.
|
|
|
*/
|
|
|
static class DiskBlock extends DataBlock {
|
|
|
|
|
@@ -704,12 +824,17 @@ final class S3ADataBlocks {
|
|
|
private final File bufferFile;
|
|
|
private final int limit;
|
|
|
private BufferedOutputStream out;
|
|
|
- private InputStream uploadStream;
|
|
|
+ private final AtomicBoolean closed = new AtomicBoolean(false);
|
|
|
|
|
|
- DiskBlock(File bufferFile, int limit)
|
|
|
+ DiskBlock(File bufferFile,
|
|
|
+ int limit,
|
|
|
+ long index,
|
|
|
+ S3AInstrumentation.OutputStreamStatistics statistics)
|
|
|
throws FileNotFoundException {
|
|
|
+ super(index, statistics);
|
|
|
this.limit = limit;
|
|
|
this.bufferFile = bufferFile;
|
|
|
+ blockAllocated();
|
|
|
out = new BufferedOutputStream(new FileOutputStream(bufferFile));
|
|
|
}
|
|
|
|
|
@@ -738,7 +863,7 @@ final class S3ADataBlocks {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- InputStream startUpload() throws IOException {
|
|
|
+ BlockUploadData startUpload() throws IOException {
|
|
|
super.startUpload();
|
|
|
try {
|
|
|
out.flush();
|
|
@@ -746,8 +871,7 @@ final class S3ADataBlocks {
|
|
|
out.close();
|
|
|
out = null;
|
|
|
}
|
|
|
- uploadStream = new FileInputStream(bufferFile);
|
|
|
- return new FileDeletingInputStream(uploadStream);
|
|
|
+ return new BlockUploadData(bufferFile);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -755,6 +879,7 @@ final class S3ADataBlocks {
|
|
|
* exists.
|
|
|
* @throws IOException IO problems
|
|
|
*/
|
|
|
+ @SuppressWarnings("UnnecessaryDefault")
|
|
|
@Override
|
|
|
protected void innerClose() throws IOException {
|
|
|
final DestState state = getState();
|
|
@@ -763,20 +888,19 @@ final class S3ADataBlocks {
|
|
|
case Writing:
|
|
|
if (bufferFile.exists()) {
|
|
|
// file was not uploaded
|
|
|
- LOG.debug("Deleting buffer file as upload did not start");
|
|
|
- boolean deleted = bufferFile.delete();
|
|
|
- if (!deleted && bufferFile.exists()) {
|
|
|
- LOG.warn("Failed to delete buffer file {}", bufferFile);
|
|
|
- }
|
|
|
+ LOG.debug("Block[{}]: Deleting buffer file as upload did not start",
|
|
|
+ index);
|
|
|
+ closeBlock();
|
|
|
}
|
|
|
break;
|
|
|
|
|
|
case Upload:
|
|
|
- LOG.debug("Buffer file {} exists —close upload stream", bufferFile);
|
|
|
+ LOG.debug("Block[{}]: Buffer file {} exists —close upload stream",
|
|
|
+ index, bufferFile);
|
|
|
break;
|
|
|
|
|
|
case Closed:
|
|
|
- // no-op
|
|
|
+ closeBlock();
|
|
|
break;
|
|
|
|
|
|
default:
|
|
@@ -798,7 +922,8 @@ final class S3ADataBlocks {
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
String sb = "FileBlock{"
|
|
|
- + "destFile=" + bufferFile +
|
|
|
+ + "index=" + index
|
|
|
+ + ", destFile=" + bufferFile +
|
|
|
", state=" + getState() +
|
|
|
", dataSize=" + dataSize() +
|
|
|
", limit=" + limit +
|
|
@@ -807,31 +932,20 @@ final class S3ADataBlocks {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * An input stream which deletes the buffer file when closed.
|
|
|
+ * Close the block.
|
|
|
+ * This will delete the block's buffer file if the block has
|
|
|
+ * not previously been closed.
|
|
|
*/
|
|
|
- private final class FileDeletingInputStream extends FilterInputStream {
|
|
|
- private final AtomicBoolean closed = new AtomicBoolean(false);
|
|
|
-
|
|
|
- FileDeletingInputStream(InputStream source) {
|
|
|
- super(source);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Delete the input file when closed.
|
|
|
- * @throws IOException IO problem
|
|
|
- */
|
|
|
- @Override
|
|
|
- public void close() throws IOException {
|
|
|
- try {
|
|
|
- super.close();
|
|
|
- } finally {
|
|
|
- if (!closed.getAndSet(true)) {
|
|
|
- if (!bufferFile.delete()) {
|
|
|
- LOG.warn("delete({}) returned false",
|
|
|
- bufferFile.getAbsoluteFile());
|
|
|
- }
|
|
|
- }
|
|
|
+ void closeBlock() {
|
|
|
+ LOG.debug("block[{}]: closeBlock()", index);
|
|
|
+ if (!closed.getAndSet(true)) {
|
|
|
+ blockReleased();
|
|
|
+ if (!bufferFile.delete() && bufferFile.exists()) {
|
|
|
+ LOG.warn("delete({}) returned false",
|
|
|
+ bufferFile.getAbsoluteFile());
|
|
|
}
|
|
|
+ } else {
|
|
|
+ LOG.debug("block[{}]: skipping re-entrant closeBlock()", index);
|
|
|
}
|
|
|
}
|
|
|
}
|