|
@@ -22,122 +22,256 @@ import java.io.ByteArrayInputStream;
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.OutputStream;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Calendar;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.Locale;
|
|
|
+import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.UUID;
|
|
|
import java.util.Random;
|
|
|
-import java.util.TimeZone;
|
|
|
+import java.util.concurrent.ConcurrentLinkedDeque;
|
|
|
+import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.Semaphore;
|
|
|
import java.util.concurrent.ThreadFactory;
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
+
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
+import org.apache.commons.lang.StringUtils;
|
|
|
|
|
|
import org.apache.hadoop.fs.FSExceptionMessages;
|
|
|
import org.apache.commons.codec.binary.Base64;
|
|
|
+import org.apache.hadoop.fs.StreamCapabilities;
|
|
|
+import org.apache.hadoop.fs.Syncable;
|
|
|
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
|
|
|
-import org.eclipse.jetty.util.log.Log;
|
|
|
+import org.apache.hadoop.io.ElasticByteBufferPool;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import com.microsoft.azure.storage.AccessCondition;
|
|
|
import com.microsoft.azure.storage.OperationContext;
|
|
|
import com.microsoft.azure.storage.StorageException;
|
|
|
+import com.microsoft.azure.storage.StorageErrorCodeStrings;
|
|
|
import com.microsoft.azure.storage.blob.BlobRequestOptions;
|
|
|
import com.microsoft.azure.storage.blob.BlockEntry;
|
|
|
import com.microsoft.azure.storage.blob.BlockListingFilter;
|
|
|
+import com.microsoft.azure.storage.blob.BlockSearchMode;
|
|
|
+
|
|
|
+import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HFLUSH;
|
|
|
+import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HSYNC;
|
|
|
|
|
|
/**
|
|
|
- * Stream object that implememnts append for Block Blobs in WASB.
|
|
|
+ * Stream object that implements append for Block Blobs in WASB.
|
|
|
+ *
|
|
|
+ * The stream object implements hflush/hsync and block compaction. Block
|
|
|
+ * compaction is the process of replacing a sequence of small blocks with one
|
|
|
+ * big block. Azure Block blobs supports up to 50000 blocks and every
|
|
|
+ * hflush/hsync generates one block. When the number of blocks is above 32000,
|
|
|
+ * the process of compaction decreases the total number of blocks, if possible.
|
|
|
+ * If compaction is disabled, hflush/hsync are empty functions.
|
|
|
+ *
|
|
|
+ * The stream object uses background threads for uploading the blocks and the
|
|
|
+ * block blob list. Blocks can be uploaded concurrently. However, when the block
|
|
|
+ * list is uploaded, block uploading should stop. If a block is uploaded before
|
|
|
+ * the block list and the block id is not in the list, the block will be lost.
|
|
|
+ * If the block is uploaded after the block list and the block id is in the
|
|
|
+ * list, the block list upload will fail. The exclusive access for the block
|
|
|
+ * list upload is managed by uploadingSemaphore.
|
|
|
*/
|
|
|
-public class BlockBlobAppendStream extends OutputStream {
|
|
|
|
|
|
+public class BlockBlobAppendStream extends OutputStream implements Syncable,
|
|
|
+ StreamCapabilities {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The name of the blob/file.
|
|
|
+ */
|
|
|
private final String key;
|
|
|
- private final int bufferSize;
|
|
|
- private ByteArrayOutputStream outBuffer;
|
|
|
- private final CloudBlockBlobWrapper blob;
|
|
|
- private final OperationContext opContext;
|
|
|
|
|
|
/**
|
|
|
- * Variable to track if the stream has been closed.
|
|
|
+ * This variable tracks if this is new blob or existing one.
|
|
|
+ */
|
|
|
+ private boolean blobExist;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * When the blob exist, to to prevent concurrent write we take a lease.
|
|
|
+ * Taking a lease is not necessary for new blobs.
|
|
|
*/
|
|
|
- private boolean closed = false;
|
|
|
+ private SelfRenewingLease lease = null;
|
|
|
|
|
|
/**
|
|
|
- * Variable to track if the append lease is released.
|
|
|
+ * The support for process of compaction is optional.
|
|
|
*/
|
|
|
+ private final boolean compactionEnabled;
|
|
|
|
|
|
- private volatile boolean leaseFreed;
|
|
|
+ /**
|
|
|
+ * The number of blocks above each block compaction is triggered.
|
|
|
+ */
|
|
|
+ private static final int DEFAULT_ACTIVATE_COMPACTION_BLOCK_COUNT = 32000;
|
|
|
|
|
|
/**
|
|
|
- * Variable to track if the append stream has been
|
|
|
- * initialized.
|
|
|
+ * The number of blocks above each block compaction is triggered.
|
|
|
*/
|
|
|
+ private int activateCompactionBlockCount
|
|
|
+ = DEFAULT_ACTIVATE_COMPACTION_BLOCK_COUNT;
|
|
|
|
|
|
- private boolean initialized = false;
|
|
|
+ /**
|
|
|
+ * The size of the output buffer. Writes store the data in outBuffer until
|
|
|
+ * either the size is above maxBlockSize or hflush/hsync is called.
|
|
|
+ */
|
|
|
+ private final AtomicInteger maxBlockSize;
|
|
|
|
|
|
/**
|
|
|
- * Last IOException encountered
|
|
|
+ * The current buffer where writes are stored.
|
|
|
*/
|
|
|
- private volatile IOException lastError = null;
|
|
|
+ private ByteBuffer outBuffer;
|
|
|
|
|
|
/**
|
|
|
- * List to keep track of the uncommitted azure storage
|
|
|
- * block ids
|
|
|
+ * The size of the blob that has been successfully stored in the Azure Blob
|
|
|
+ * service.
|
|
|
*/
|
|
|
- private final List<BlockEntry> uncommittedBlockEntries;
|
|
|
+ private final AtomicLong committedBlobLength = new AtomicLong(0);
|
|
|
|
|
|
- private static final int UNSET_BLOCKS_COUNT = -1;
|
|
|
+ /**
|
|
|
+ * Position of last block in the blob.
|
|
|
+ */
|
|
|
+ private volatile long blobLength = 0;
|
|
|
|
|
|
/**
|
|
|
- * Variable to hold the next block id to be used for azure
|
|
|
- * storage blocks.
|
|
|
+ * Minutes waiting before the close operation timed out.
|
|
|
*/
|
|
|
- private long nextBlockCount = UNSET_BLOCKS_COUNT;
|
|
|
+ private static final int CLOSE_UPLOAD_DELAY = 10;
|
|
|
|
|
|
/**
|
|
|
- * Variable to hold the block id prefix to be used for azure
|
|
|
- * storage blocks from azure-storage-java sdk version 4.2.0 onwards
|
|
|
+ * Keep alive time for the threadpool.
|
|
|
*/
|
|
|
- private String blockIdPrefix = null;
|
|
|
+ private static final int THREADPOOL_KEEP_ALIVE = 30;
|
|
|
+ /**
|
|
|
+ * Azure Block Blob used for the stream.
|
|
|
+ */
|
|
|
+ private final CloudBlockBlobWrapper blob;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Azure Storage operation context.
|
|
|
+ */
|
|
|
+ private final OperationContext opContext;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Commands send from client calls to the background thread pool.
|
|
|
+ */
|
|
|
+ private abstract class UploadCommand {
|
|
|
+
|
|
|
+ // the blob offset for the command
|
|
|
+ private final long commandBlobOffset;
|
|
|
+
|
|
|
+ // command completion latch
|
|
|
+ private final CountDownLatch completed = new CountDownLatch(1);
|
|
|
+
|
|
|
+ UploadCommand(long offset) {
|
|
|
+ this.commandBlobOffset = offset;
|
|
|
+ }
|
|
|
+
|
|
|
+ long getCommandBlobOffset() {
|
|
|
+ return commandBlobOffset;
|
|
|
+ }
|
|
|
+
|
|
|
+ void await() throws InterruptedException {
|
|
|
+ completed.await();
|
|
|
+ }
|
|
|
+
|
|
|
+ void awaitAsDependent() throws InterruptedException {
|
|
|
+ await();
|
|
|
+ }
|
|
|
+
|
|
|
+ void setCompleted() {
|
|
|
+ completed.countDown();
|
|
|
+ }
|
|
|
|
|
|
- private final Random sequenceGenerator = new Random();
|
|
|
+ void execute() throws InterruptedException, IOException {}
|
|
|
+
|
|
|
+ void dump() {}
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The list of recent commands. Before block list is committed, all the block
|
|
|
+ * listed in the list must be uploaded. activeBlockCommands is used for
|
|
|
+ * enumerating the blocks and waiting on the latch until the block is
|
|
|
+ * uploaded.
|
|
|
+ */
|
|
|
+ private final ConcurrentLinkedQueue<UploadCommand> activeBlockCommands
|
|
|
+ = new ConcurrentLinkedQueue<>();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Variable to track if the stream has been closed.
|
|
|
+ */
|
|
|
+ private volatile boolean closed = false;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * First IOException encountered.
|
|
|
+ */
|
|
|
+ private final AtomicReference<IOException> firstError
|
|
|
+ = new AtomicReference<>();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Flag set when the first error has been thrown.
|
|
|
+ */
|
|
|
+ private boolean firstErrorThrown = false;
|
|
|
|
|
|
/**
|
|
|
- * Time to wait to renew lease in milliseconds
|
|
|
+ * Semaphore for serializing block uploads with NativeAzureFileSystem.
|
|
|
+ *
|
|
|
+ * The semaphore starts with number of permits equal to the number of block
|
|
|
+ * upload threads. Each block upload thread needs one permit to start the
|
|
|
+ * upload. The put block list acquires all the permits before the block list
|
|
|
+ * is committed.
|
|
|
*/
|
|
|
- private static final int LEASE_RENEWAL_PERIOD = 10000;
|
|
|
+ private final Semaphore uploadingSemaphore = new Semaphore(
|
|
|
+ MAX_NUMBER_THREADS_IN_THREAD_POOL,
|
|
|
+ true);
|
|
|
|
|
|
/**
|
|
|
- * Number of times to retry for lease renewal
|
|
|
+ * Queue storing buffers with the size of the Azure block ready for
|
|
|
+ * reuse. The pool allows reusing the blocks instead of allocating new
|
|
|
+ * blocks. After the data is sent to the service, the buffer is returned
|
|
|
+ * back to the queue
|
|
|
*/
|
|
|
- private static final int MAX_LEASE_RENEWAL_RETRY_COUNT = 3;
|
|
|
+ private final ElasticByteBufferPool poolReadyByteBuffers
|
|
|
+ = new ElasticByteBufferPool();
|
|
|
|
|
|
/**
|
|
|
- * Time to wait before retrying to set the lease
|
|
|
+ * The blob's block list.
|
|
|
*/
|
|
|
- private static final int LEASE_RENEWAL_RETRY_SLEEP_PERIOD = 500;
|
|
|
+ private final List<BlockEntry> blockEntries = new ArrayList<>(
|
|
|
+ DEFAULT_CAPACITY_BLOCK_ENTRIES);
|
|
|
+
|
|
|
+ private static final int DEFAULT_CAPACITY_BLOCK_ENTRIES = 1024;
|
|
|
|
|
|
/**
|
|
|
- * Metadata key used on the blob to indicate append lease is active
|
|
|
+ * The uncommitted blob's block list.
|
|
|
*/
|
|
|
- public static final String APPEND_LEASE = "append_lease";
|
|
|
+ private final ConcurrentLinkedDeque<BlockEntry> uncommittedBlockEntries
|
|
|
+ = new ConcurrentLinkedDeque<>();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Variable to hold the next block id to be used for azure storage blocks.
|
|
|
+ */
|
|
|
+ private static final int UNSET_BLOCKS_COUNT = -1;
|
|
|
+ private long nextBlockCount = UNSET_BLOCKS_COUNT;
|
|
|
|
|
|
/**
|
|
|
- * Timeout value for the append lease in millisecs. If the lease is not
|
|
|
- * renewed within 30 seconds then another thread can acquire the append lease
|
|
|
- * on the blob
|
|
|
+ * Variable to hold the block id prefix to be used for azure storage blocks.
|
|
|
*/
|
|
|
- public static final int APPEND_LEASE_TIMEOUT = 30000;
|
|
|
+ private String blockIdPrefix = null;
|
|
|
|
|
|
/**
|
|
|
- * Metdata key used on the blob to indicate last modified time of append lease
|
|
|
+ * Maximum number of threads in block upload thread pool.
|
|
|
*/
|
|
|
- public static final String APPEND_LEASE_LAST_MODIFIED = "append_lease_last_modified";
|
|
|
+ private static final int MAX_NUMBER_THREADS_IN_THREAD_POOL = 4;
|
|
|
|
|
|
/**
|
|
|
* Number of times block upload needs is retried.
|
|
@@ -145,16 +279,32 @@ public class BlockBlobAppendStream extends OutputStream {
|
|
|
private static final int MAX_BLOCK_UPLOAD_RETRIES = 3;
|
|
|
|
|
|
/**
|
|
|
- * Wait time between block upload retries in millisecs.
|
|
|
+ * Wait time between block upload retries in milliseconds.
|
|
|
*/
|
|
|
private static final int BLOCK_UPLOAD_RETRY_INTERVAL = 1000;
|
|
|
|
|
|
- private static final Logger LOG = LoggerFactory.getLogger(BlockBlobAppendStream.class);
|
|
|
+ /**
|
|
|
+ * Logger.
|
|
|
+ */
|
|
|
+ private static final Logger LOG =
|
|
|
+ LoggerFactory.getLogger(BlockBlobAppendStream.class);
|
|
|
|
|
|
+ /**
|
|
|
+ * The absolute maximum of blocks for a blob. It includes committed and
|
|
|
+ * temporary blocks.
|
|
|
+ */
|
|
|
private static final int MAX_BLOCK_COUNT = 100000;
|
|
|
|
|
|
+ /**
|
|
|
+ * The upload thread pool executor.
|
|
|
+ */
|
|
|
private ThreadPoolExecutor ioThreadPool;
|
|
|
|
|
|
+ /**
|
|
|
+ * Azure Storage access conditions for the blob.
|
|
|
+ */
|
|
|
+ private final AccessCondition accessCondition = new AccessCondition();
|
|
|
+
|
|
|
/**
|
|
|
* Atomic integer to provide thread id for thread names for uploader threads.
|
|
|
*/
|
|
@@ -163,106 +313,123 @@ public class BlockBlobAppendStream extends OutputStream {
|
|
|
/**
|
|
|
* Prefix to be used for thread names for uploader threads.
|
|
|
*/
|
|
|
- private static final String THREAD_ID_PREFIX = "BlockBlobAppendStream";
|
|
|
-
|
|
|
- private static final String UTC_STR = "UTC";
|
|
|
+ private static final String THREAD_ID_PREFIX = "append-blockblob";
|
|
|
|
|
|
+ /**
|
|
|
+ * BlockBlobAppendStream constructor.
|
|
|
+ *
|
|
|
+ * @param blob
|
|
|
+ * Azure Block Blob
|
|
|
+ * @param aKey
|
|
|
+ * blob's name
|
|
|
+ * @param bufferSize
|
|
|
+ * the maximum size of a blob block.
|
|
|
+ * @param compactionEnabled
|
|
|
+ * is the compaction process enabled for this blob
|
|
|
+ * @param opContext
|
|
|
+ * Azure Store operation context for the blob
|
|
|
+ * @throws IOException
|
|
|
+ * if an I/O error occurs. In particular, an IOException may be
|
|
|
+ * thrown if the output stream cannot be used for append operations
|
|
|
+ */
|
|
|
public BlockBlobAppendStream(final CloudBlockBlobWrapper blob,
|
|
|
- final String aKey, final int bufferSize, final OperationContext opContext)
|
|
|
+ final String aKey,
|
|
|
+ final int bufferSize,
|
|
|
+ final boolean compactionEnabled,
|
|
|
+ final OperationContext opContext)
|
|
|
throws IOException {
|
|
|
|
|
|
- if (null == aKey || 0 == aKey.length()) {
|
|
|
- throw new IllegalArgumentException(
|
|
|
- "Illegal argument: The key string is null or empty");
|
|
|
- }
|
|
|
-
|
|
|
- if (0 >= bufferSize) {
|
|
|
- throw new IllegalArgumentException(
|
|
|
- "Illegal argument bufferSize cannot be zero or negative");
|
|
|
- }
|
|
|
-
|
|
|
+ Preconditions.checkArgument(StringUtils.isNotEmpty(aKey));
|
|
|
+ Preconditions.checkArgument(bufferSize >= 0);
|
|
|
|
|
|
this.blob = blob;
|
|
|
this.opContext = opContext;
|
|
|
this.key = aKey;
|
|
|
- this.bufferSize = bufferSize;
|
|
|
+ this.maxBlockSize = new AtomicInteger(bufferSize);
|
|
|
this.threadSequenceNumber = new AtomicInteger(0);
|
|
|
this.blockIdPrefix = null;
|
|
|
- setBlocksCountAndBlockIdPrefix();
|
|
|
+ this.compactionEnabled = compactionEnabled;
|
|
|
+ this.blobExist = true;
|
|
|
+ this.outBuffer = poolReadyByteBuffers.getBuffer(false, maxBlockSize.get());
|
|
|
|
|
|
- this.outBuffer = new ByteArrayOutputStream(bufferSize);
|
|
|
- this.uncommittedBlockEntries = new ArrayList<BlockEntry>();
|
|
|
-
|
|
|
- // Acquire append lease on the blob.
|
|
|
try {
|
|
|
- //Set the append lease if the value of the append lease is false
|
|
|
- if (!updateBlobAppendMetadata(true, false)) {
|
|
|
- LOG.error("Unable to set Append Lease on the Blob : {} "
|
|
|
- + "Possibly because another client already has a create or append stream open on the Blob", key);
|
|
|
- throw new IOException("Unable to set Append lease on the Blob. "
|
|
|
- + "Possibly because another client already had an append stream open on the Blob.");
|
|
|
- }
|
|
|
- } catch (StorageException ex) {
|
|
|
- LOG.error("Encountered Storage exception while acquiring append "
|
|
|
- + "lease on blob : {}. Storage Exception : {} ErrorCode : {}",
|
|
|
- key, ex, ex.getErrorCode());
|
|
|
+ // download the block list
|
|
|
+ blockEntries.addAll(
|
|
|
+ blob.downloadBlockList(
|
|
|
+ BlockListingFilter.COMMITTED,
|
|
|
+ new BlobRequestOptions(),
|
|
|
+ opContext));
|
|
|
+
|
|
|
+ blobLength = blob.getProperties().getLength();
|
|
|
+
|
|
|
+ committedBlobLength.set(blobLength);
|
|
|
|
|
|
- throw new IOException(ex);
|
|
|
+ // Acquiring lease on the blob.
|
|
|
+ lease = new SelfRenewingLease(blob, true);
|
|
|
+ accessCondition.setLeaseID(lease.getLeaseID());
|
|
|
+
|
|
|
+ } catch (StorageException ex) {
|
|
|
+ if (ex.getErrorCode().equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)) {
|
|
|
+ blobExist = false;
|
|
|
+ }
|
|
|
+ else if (ex.getErrorCode().equals(
|
|
|
+ StorageErrorCodeStrings.LEASE_ALREADY_PRESENT)) {
|
|
|
+ throw new AzureException(
|
|
|
+ "Unable to set Append lease on the Blob: " + ex, ex);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ LOG.debug(
|
|
|
+ "Encountered storage exception."
|
|
|
+ + " StorageException : {} ErrorCode : {}",
|
|
|
+ ex,
|
|
|
+ ex.getErrorCode());
|
|
|
+ throw new AzureException(ex);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- leaseFreed = false;
|
|
|
+ setBlocksCountAndBlockIdPrefix(blockEntries);
|
|
|
+
|
|
|
+ this.ioThreadPool = new ThreadPoolExecutor(
|
|
|
+ MAX_NUMBER_THREADS_IN_THREAD_POOL,
|
|
|
+ MAX_NUMBER_THREADS_IN_THREAD_POOL,
|
|
|
+ THREADPOOL_KEEP_ALIVE,
|
|
|
+ TimeUnit.SECONDS,
|
|
|
+ new LinkedBlockingQueue<>(),
|
|
|
+ new UploaderThreadFactory());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Helper method that starts an Append Lease renewer thread and the
|
|
|
- * thread pool.
|
|
|
+ * Set payload size of the stream.
|
|
|
+ * It is intended to be used for unit testing purposes only.
|
|
|
*/
|
|
|
- public synchronized void initialize() {
|
|
|
-
|
|
|
- if (initialized) {
|
|
|
- return;
|
|
|
- }
|
|
|
- /*
|
|
|
- * Start the thread for Append lease renewer.
|
|
|
- */
|
|
|
- Thread appendLeaseRenewer = new Thread(new AppendRenewer());
|
|
|
- appendLeaseRenewer.setDaemon(true);
|
|
|
- appendLeaseRenewer.setName(String.format("%s-AppendLeaseRenewer", key));
|
|
|
- appendLeaseRenewer.start();
|
|
|
-
|
|
|
- /*
|
|
|
- * Parameters to ThreadPoolExecutor:
|
|
|
- * corePoolSize : the number of threads to keep in the pool, even if they are idle,
|
|
|
- * unless allowCoreThreadTimeOut is set
|
|
|
- * maximumPoolSize : the maximum number of threads to allow in the pool
|
|
|
- * keepAliveTime - when the number of threads is greater than the core,
|
|
|
- * this is the maximum time that excess idle threads will
|
|
|
- * wait for new tasks before terminating.
|
|
|
- * unit - the time unit for the keepAliveTime argument
|
|
|
- * workQueue - the queue to use for holding tasks before they are executed
|
|
|
- * This queue will hold only the Runnable tasks submitted by the execute method.
|
|
|
- */
|
|
|
- this.ioThreadPool = new ThreadPoolExecutor(4, 4, 2, TimeUnit.SECONDS,
|
|
|
- new LinkedBlockingQueue<Runnable>(), new UploaderThreadFactory());
|
|
|
+ @VisibleForTesting
|
|
|
+ synchronized void setMaxBlockSize(int size) {
|
|
|
+ maxBlockSize.set(size);
|
|
|
|
|
|
- initialized = true;
|
|
|
+ // it is for testing only so we can abandon the previously allocated
|
|
|
+ // payload
|
|
|
+ this.outBuffer = ByteBuffer.allocate(maxBlockSize.get());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Get the blob name.
|
|
|
- *
|
|
|
- * @return String Blob name.
|
|
|
+ * Set compaction parameters.
|
|
|
+ * It is intended to be used for unit testing purposes only.
|
|
|
*/
|
|
|
- public String getKey() {
|
|
|
- return key;
|
|
|
+ @VisibleForTesting
|
|
|
+ void setCompactionBlockCount(int activationCount) {
|
|
|
+ activateCompactionBlockCount = activationCount;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Get the backing blob.
|
|
|
- * @return buffer size of the stream.
|
|
|
+ * Get the list of block entries. It is used for testing purposes only.
|
|
|
+ * @return List of block entries.
|
|
|
*/
|
|
|
- public int getBufferSize() {
|
|
|
- return bufferSize;
|
|
|
+ @VisibleForTesting
|
|
|
+ List<BlockEntry> getBlockList() throws StorageException, IOException {
|
|
|
+ return blob.downloadBlockList(
|
|
|
+ BlockListingFilter.COMMITTED,
|
|
|
+ new BlobRequestOptions(),
|
|
|
+ opContext);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -282,21 +449,6 @@ public class BlockBlobAppendStream extends OutputStream {
|
|
|
write(new byte[] { (byte) (byteVal & 0xFF) });
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Writes b.length bytes from the specified byte array to this output stream.
|
|
|
- *
|
|
|
- * @param data
|
|
|
- * the byte array to write.
|
|
|
- *
|
|
|
- * @throws IOException
|
|
|
- * if an I/O error occurs. In particular, an IOException may be
|
|
|
- * thrown if the output stream has been closed.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public void write(final byte[] data) throws IOException {
|
|
|
- write(data, 0, data.length);
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Writes length bytes from the specified byte array starting at offset to
|
|
|
* this output stream.
|
|
@@ -312,529 +464,678 @@ public class BlockBlobAppendStream extends OutputStream {
|
|
|
* thrown if the output stream has been closed.
|
|
|
*/
|
|
|
@Override
|
|
|
- public void write(final byte[] data, final int offset, final int length)
|
|
|
+ public synchronized void write(final byte[] data, int offset, int length)
|
|
|
throws IOException {
|
|
|
+ Preconditions.checkArgument(data != null, "null data");
|
|
|
|
|
|
if (offset < 0 || length < 0 || length > data.length - offset) {
|
|
|
- throw new IndexOutOfBoundsException("write API in append stream called with invalid arguments");
|
|
|
- }
|
|
|
-
|
|
|
- writeInternal(data, offset, length);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public synchronized void close() throws IOException {
|
|
|
-
|
|
|
- if (!initialized) {
|
|
|
- throw new IOException("Trying to close an uninitialized Append stream");
|
|
|
+ throw new IndexOutOfBoundsException();
|
|
|
}
|
|
|
|
|
|
if (closed) {
|
|
|
- return;
|
|
|
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
|
|
|
}
|
|
|
|
|
|
- if (leaseFreed) {
|
|
|
- throw new IOException(String.format("Attempting to close an append stream on blob : %s "
|
|
|
- + " that does not have lease on the Blob. Failing close", key));
|
|
|
- }
|
|
|
+ while (outBuffer.remaining() < length) {
|
|
|
+
|
|
|
+ int remaining = outBuffer.remaining();
|
|
|
+ outBuffer.put(data, offset, remaining);
|
|
|
+
|
|
|
+ // upload payload to azure storage
|
|
|
+ addBlockUploadCommand();
|
|
|
|
|
|
- if (outBuffer.size() > 0) {
|
|
|
- uploadBlockToStorage(outBuffer.toByteArray());
|
|
|
+ offset += remaining;
|
|
|
+ length -= remaining;
|
|
|
}
|
|
|
|
|
|
- ioThreadPool.shutdown();
|
|
|
+ outBuffer.put(data, offset, length);
|
|
|
+ }
|
|
|
|
|
|
- try {
|
|
|
- if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) {
|
|
|
- LOG.error("Time out occurred while waiting for IO request to finish in append"
|
|
|
- + " for blob : {}", key);
|
|
|
- NativeAzureFileSystemHelper.logAllLiveStackTraces();
|
|
|
- throw new IOException("Timed out waiting for IO requests to finish");
|
|
|
- }
|
|
|
- } catch(InterruptedException intrEx) {
|
|
|
|
|
|
- // Restore the interrupted status
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- LOG.error("Upload block operation in append interrupted for blob {}. Failing close", key);
|
|
|
- throw new IOException("Append Commit interrupted.");
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * Flushes this output stream and forces any buffered output bytes to be
|
|
|
+ * written out. If any data remains in the payload it is committed to the
|
|
|
+ * service. Data is queued for writing and forced out to the service
|
|
|
+ * before the call returns.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void flush() throws IOException {
|
|
|
|
|
|
- // Calling commit after all blocks are succesfully uploaded.
|
|
|
- if (lastError == null) {
|
|
|
- commitAppendBlocks();
|
|
|
+ if (closed) {
|
|
|
+ // calling close() after the stream is closed starts with call to flush()
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
- // Perform cleanup.
|
|
|
- cleanup();
|
|
|
+ addBlockUploadCommand();
|
|
|
|
|
|
- if (lastError != null) {
|
|
|
- throw lastError;
|
|
|
+ if (committedBlobLength.get() < blobLength) {
|
|
|
+ try {
|
|
|
+ // wait until the block list is committed
|
|
|
+ addFlushCommand().await();
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Helper method that cleans up the append stream.
|
|
|
+ * Force all data in the output stream to be written to Azure storage.
|
|
|
+ * Wait to return until this is complete.
|
|
|
*/
|
|
|
- private synchronized void cleanup() {
|
|
|
-
|
|
|
- closed = true;
|
|
|
-
|
|
|
- try {
|
|
|
- // Set the value of append lease to false if the value is set to true.
|
|
|
- updateBlobAppendMetadata(false, true);
|
|
|
- } catch(StorageException ex) {
|
|
|
- LOG.debug("Append metadata update on the Blob : {} encountered Storage Exception : {} "
|
|
|
- + "Error Code : {}",
|
|
|
- key, ex, ex.getErrorCode());
|
|
|
- lastError = new IOException(ex);
|
|
|
+ @Override
|
|
|
+ public void hsync() throws IOException {
|
|
|
+ // when block compaction is disabled, hsync is empty function
|
|
|
+ if (compactionEnabled) {
|
|
|
+ flush();
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- leaseFreed = true;
|
|
|
+ /**
|
|
|
+ * Force all data in the output stream to be written to Azure storage.
|
|
|
+ * Wait to return until this is complete.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void hflush() throws IOException {
|
|
|
+ // when block compaction is disabled, hflush is empty function
|
|
|
+ if (compactionEnabled) {
|
|
|
+ flush();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Method to commit all the uncommited blocks to azure storage.
|
|
|
- * If the commit fails then blocks are automatically cleaned up
|
|
|
- * by Azure storage.
|
|
|
- * @throws IOException
|
|
|
+ * The Synchronization capabilities of this stream depend upon the compaction
|
|
|
+ * policy.
|
|
|
+ * @param capability string to query the stream support for.
|
|
|
+ * @return true for hsync and hflush when compaction is enabled.
|
|
|
*/
|
|
|
- private synchronized void commitAppendBlocks() throws IOException {
|
|
|
+ @Override
|
|
|
+ public boolean hasCapability(String capability) {
|
|
|
+ return compactionEnabled
|
|
|
+ && (capability.equalsIgnoreCase(HSYNC.getValue())
|
|
|
+ || capability.equalsIgnoreCase((HFLUSH.getValue())));
|
|
|
+ }
|
|
|
|
|
|
- SelfRenewingLease lease = null;
|
|
|
+ /**
|
|
|
+ * Force all data in the output stream to be written to Azure storage.
|
|
|
+ * Wait to return until this is complete. Close the access to the stream and
|
|
|
+ * shutdown the upload thread pool.
|
|
|
+ * If the blob was created, its lease will be released.
|
|
|
+ * Any error encountered caught in threads and stored will be rethrown here
|
|
|
+ * after cleanup.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public synchronized void close() throws IOException {
|
|
|
|
|
|
- try {
|
|
|
- if (uncommittedBlockEntries.size() > 0) {
|
|
|
+ LOG.debug("close {} ", key);
|
|
|
|
|
|
- //Acquiring lease on the blob.
|
|
|
- lease = new SelfRenewingLease(blob);
|
|
|
+ if (closed) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- // Downloading existing blocks
|
|
|
- List<BlockEntry> blockEntries = blob.downloadBlockList(BlockListingFilter.COMMITTED,
|
|
|
- new BlobRequestOptions(), opContext);
|
|
|
+ // Upload the last block regardless of compactionEnabled flag
|
|
|
+ flush();
|
|
|
|
|
|
- // Adding uncommitted blocks.
|
|
|
- blockEntries.addAll(uncommittedBlockEntries);
|
|
|
+ // Initiates an orderly shutdown in which previously submitted tasks are
|
|
|
+ // executed.
|
|
|
+ ioThreadPool.shutdown();
|
|
|
|
|
|
- AccessCondition accessCondition = new AccessCondition();
|
|
|
- accessCondition.setLeaseID(lease.getLeaseID());
|
|
|
- blob.commitBlockList(blockEntries, accessCondition, new BlobRequestOptions(), opContext);
|
|
|
- uncommittedBlockEntries.clear();
|
|
|
+ try {
|
|
|
+ // wait up to CLOSE_UPLOAD_DELAY minutes to upload all the blocks
|
|
|
+ if (!ioThreadPool.awaitTermination(CLOSE_UPLOAD_DELAY, TimeUnit.MINUTES)) {
|
|
|
+ LOG.error("Time out occurred while close() is waiting for IO request to"
|
|
|
+ + " finish in append"
|
|
|
+ + " for blob : {}",
|
|
|
+ key);
|
|
|
+ NativeAzureFileSystemHelper.logAllLiveStackTraces();
|
|
|
+ throw new AzureException("Timed out waiting for IO requests to finish");
|
|
|
}
|
|
|
- } catch(StorageException ex) {
|
|
|
- LOG.error("Storage exception encountered during block commit phase of append for blob"
|
|
|
- + " : {} Storage Exception : {} Error Code: {}", key, ex, ex.getErrorCode());
|
|
|
- throw new IOException("Encountered Exception while committing append blocks", ex);
|
|
|
- } finally {
|
|
|
- if (lease != null) {
|
|
|
+ } catch(InterruptedException ex) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
+
|
|
|
+ // release the lease
|
|
|
+ if (firstError.get() == null && blobExist) {
|
|
|
try {
|
|
|
lease.free();
|
|
|
- } catch(StorageException ex) {
|
|
|
- LOG.debug("Exception encountered while releasing lease for "
|
|
|
- + "blob : {} StorageException : {} ErrorCode : {}", key, ex, ex.getErrorCode());
|
|
|
- // Swallowing exception here as the lease is cleaned up by the SelfRenewingLease object.
|
|
|
+ } catch (StorageException ex) {
|
|
|
+ LOG.debug("Lease free update blob {} encountered Storage Exception:"
|
|
|
+ + " {} Error Code : {}",
|
|
|
+ key,
|
|
|
+ ex,
|
|
|
+ ex.getErrorCode());
|
|
|
+ maybeSetFirstError(new AzureException(ex));
|
|
|
}
|
|
|
- }
|
|
|
+ }
|
|
|
+
|
|
|
+ closed = true;
|
|
|
+
|
|
|
+ // finally, throw the first exception raised if it has not
|
|
|
+ // been thrown elsewhere.
|
|
|
+ if (firstError.get() != null && !firstErrorThrown) {
|
|
|
+ throw firstError.get();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Helper method used to generate the blockIDs. The algorithm used is similar to the Azure
|
|
|
- * storage SDK.
|
|
|
+ * Helper method used to generate the blockIDs. The algorithm used is similar
|
|
|
+ * to the Azure storage SDK.
|
|
|
*/
|
|
|
- private void setBlocksCountAndBlockIdPrefix() throws IOException {
|
|
|
+ private void setBlocksCountAndBlockIdPrefix(List<BlockEntry> blockEntries) {
|
|
|
|
|
|
- try {
|
|
|
+ if (nextBlockCount == UNSET_BLOCKS_COUNT && blockIdPrefix == null) {
|
|
|
|
|
|
- if (nextBlockCount == UNSET_BLOCKS_COUNT && blockIdPrefix==null) {
|
|
|
+ Random sequenceGenerator = new Random();
|
|
|
|
|
|
- List<BlockEntry> blockEntries =
|
|
|
- blob.downloadBlockList(BlockListingFilter.COMMITTED, new BlobRequestOptions(), opContext);
|
|
|
+ String blockZeroBlockId = (!blockEntries.isEmpty())
|
|
|
+ ? blockEntries.get(0).getId()
|
|
|
+ : "";
|
|
|
+ String prefix = UUID.randomUUID().toString() + "-";
|
|
|
+ String sampleNewerVersionBlockId = generateNewerVersionBlockId(prefix,
|
|
|
+ 0);
|
|
|
|
|
|
- String blockZeroBlockId = (blockEntries.size() > 0) ? blockEntries.get(0).getId() : "";
|
|
|
- String prefix = UUID.randomUUID().toString() + "-";
|
|
|
- String sampleNewerVersionBlockId = generateNewerVersionBlockId(prefix, 0);
|
|
|
+ if (!blockEntries.isEmpty()
|
|
|
+ && blockZeroBlockId.length() < sampleNewerVersionBlockId.length()) {
|
|
|
|
|
|
- if (blockEntries.size() > 0 && blockZeroBlockId.length() < sampleNewerVersionBlockId.length()) {
|
|
|
+ // If blob has already been created with 2.2.0, append subsequent blocks
|
|
|
+ // with older version (2.2.0) blockId compute nextBlockCount, the way it
|
|
|
+ // was done before; and don't use blockIdPrefix
|
|
|
+ this.blockIdPrefix = "";
|
|
|
+ nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE))
|
|
|
+ + sequenceGenerator.nextInt(
|
|
|
+ Integer.MAX_VALUE - MAX_BLOCK_COUNT);
|
|
|
+ nextBlockCount += blockEntries.size();
|
|
|
|
|
|
- // If blob has already been created with 2.2.0, append subsequent blocks with older version (2.2.0) blockId
|
|
|
- // compute nextBlockCount, the way it was done before; and don't use blockIdPrefix
|
|
|
- this.blockIdPrefix = "";
|
|
|
- nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE))
|
|
|
- + sequenceGenerator.nextInt(Integer.MAX_VALUE - MAX_BLOCK_COUNT);
|
|
|
- nextBlockCount += blockEntries.size();
|
|
|
-
|
|
|
- } else {
|
|
|
-
|
|
|
- // If there are no existing blocks, create the first block with newer version (4.2.0) blockId
|
|
|
- // If blob has already been created with 4.2.0, append subsequent blocks with newer version (4.2.0) blockId
|
|
|
- this.blockIdPrefix = prefix;
|
|
|
- nextBlockCount = blockEntries.size();
|
|
|
-
|
|
|
- }
|
|
|
+ } else {
|
|
|
|
|
|
+ // If there are no existing blocks, create the first block with newer
|
|
|
+ // version (4.2.0) blockId. If blob has already been created with 4.2.0,
|
|
|
+ // append subsequent blocks with newer version (4.2.0) blockId
|
|
|
+ this.blockIdPrefix = prefix;
|
|
|
+ nextBlockCount = blockEntries.size();
|
|
|
}
|
|
|
-
|
|
|
- } catch (StorageException ex) {
|
|
|
- LOG.debug("Encountered storage exception during setting next Block Count and BlockId prefix."
|
|
|
- + " StorageException : {} ErrorCode : {}", ex, ex.getErrorCode());
|
|
|
- throw new IOException(ex);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Helper method that generates the next block id for uploading a block to azure storage.
|
|
|
+ * Helper method that generates the next block id for uploading a block to
|
|
|
+ * azure storage.
|
|
|
* @return String representing the block ID generated.
|
|
|
- * @throws IOException
|
|
|
+ * @throws IOException if the stream is in invalid state
|
|
|
*/
|
|
|
private String generateBlockId() throws IOException {
|
|
|
|
|
|
- if (nextBlockCount == UNSET_BLOCKS_COUNT) {
|
|
|
- throw new IOException("Append Stream in invalid state. nextBlockCount not set correctly");
|
|
|
- }
|
|
|
-
|
|
|
- if (this.blockIdPrefix == null) {
|
|
|
- throw new IOException("Append Stream in invalid state. blockIdPrefix not set correctly");
|
|
|
- }
|
|
|
-
|
|
|
- if (!this.blockIdPrefix.equals("")) {
|
|
|
-
|
|
|
- return generateNewerVersionBlockId(this.blockIdPrefix, nextBlockCount++);
|
|
|
-
|
|
|
- } else {
|
|
|
-
|
|
|
- return generateOlderVersionBlockId(nextBlockCount++);
|
|
|
-
|
|
|
+ if (nextBlockCount == UNSET_BLOCKS_COUNT || blockIdPrefix == null) {
|
|
|
+ throw new AzureException(
|
|
|
+ "Append Stream in invalid state. nextBlockCount not set correctly");
|
|
|
}
|
|
|
|
|
|
+ return (!blockIdPrefix.isEmpty())
|
|
|
+ ? generateNewerVersionBlockId(blockIdPrefix, nextBlockCount++)
|
|
|
+ : generateOlderVersionBlockId(nextBlockCount++);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Helper method that generates an older (2.2.0) version blockId
|
|
|
+ * Helper method that generates an older (2.2.0) version blockId.
|
|
|
* @return String representing the block ID generated.
|
|
|
*/
|
|
|
private String generateOlderVersionBlockId(long id) {
|
|
|
|
|
|
- byte[] blockIdInBytes = getBytesFromLong(id);
|
|
|
- return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8);
|
|
|
+ byte[] blockIdInBytes = new byte[8];
|
|
|
+ for (int m = 0; m < 8; m++) {
|
|
|
+ blockIdInBytes[7 - m] = (byte) ((id >> (8 * m)) & 0xFF);
|
|
|
+ }
|
|
|
+
|
|
|
+ return new String(
|
|
|
+ Base64.encodeBase64(blockIdInBytes),
|
|
|
+ StandardCharsets.UTF_8);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Helper method that generates an newer (4.2.0) version blockId
|
|
|
+ * Helper method that generates an newer (4.2.0) version blockId.
|
|
|
* @return String representing the block ID generated.
|
|
|
*/
|
|
|
private String generateNewerVersionBlockId(String prefix, long id) {
|
|
|
|
|
|
String blockIdSuffix = String.format("%06d", id);
|
|
|
- byte[] blockIdInBytes = (prefix + blockIdSuffix).getBytes(StandardCharsets.UTF_8);
|
|
|
+ byte[] blockIdInBytes =
|
|
|
+ (prefix + blockIdSuffix).getBytes(StandardCharsets.UTF_8);
|
|
|
return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Returns a byte array that represents the data of a <code>long</code> value. This
|
|
|
- * utility method is copied from com.microsoft.azure.storage.core.Utility class.
|
|
|
- * This class is marked as internal, hence we clone the method here and not express
|
|
|
- * dependency on the Utility Class
|
|
|
- *
|
|
|
- * @param value
|
|
|
- * The value from which the byte array will be returned.
|
|
|
- *
|
|
|
- * @return A byte array that represents the data of the specified <code>long</code> value.
|
|
|
+ * This is shared between upload block Runnable and CommitBlockList. The
|
|
|
+ * method captures retry logic
|
|
|
+ * @param blockId block name
|
|
|
+ * @param dataPayload block content
|
|
|
*/
|
|
|
- private static byte[] getBytesFromLong(final long value) {
|
|
|
+ private void writeBlockRequestInternal(String blockId,
|
|
|
+ ByteBuffer dataPayload,
|
|
|
+ boolean bufferPoolBuffer) {
|
|
|
+ IOException lastLocalException = null;
|
|
|
+
|
|
|
+ int uploadRetryAttempts = 0;
|
|
|
+ while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) {
|
|
|
+ try {
|
|
|
+ long startTime = System.nanoTime();
|
|
|
+
|
|
|
+ blob.uploadBlock(blockId, accessCondition, new ByteArrayInputStream(
|
|
|
+ dataPayload.array()), dataPayload.position(),
|
|
|
+ new BlobRequestOptions(), opContext);
|
|
|
|
|
|
- final byte[] tempArray = new byte[8];
|
|
|
+ LOG.debug("upload block finished for {} ms. block {} ",
|
|
|
+ TimeUnit.NANOSECONDS.toMillis(
|
|
|
+ System.nanoTime() - startTime), blockId);
|
|
|
+ break;
|
|
|
+
|
|
|
+ } catch(Exception ioe) {
|
|
|
+ LOG.debug("Encountered exception during uploading block for Blob {}"
|
|
|
+ + " Exception : {}", key, ioe);
|
|
|
+ uploadRetryAttempts++;
|
|
|
+ lastLocalException = new AzureException(
|
|
|
+ "Encountered Exception while uploading block: " + ioe, ioe);
|
|
|
+ try {
|
|
|
+ Thread.sleep(
|
|
|
+ BLOCK_UPLOAD_RETRY_INTERVAL * (uploadRetryAttempts + 1));
|
|
|
+ } catch(InterruptedException ie) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- for (int m = 0; m < 8; m++) {
|
|
|
- tempArray[7 - m] = (byte) ((value >> (8 * m)) & 0xFF);
|
|
|
+ if (bufferPoolBuffer) {
|
|
|
+ poolReadyByteBuffers.putBuffer(dataPayload);
|
|
|
}
|
|
|
|
|
|
- return tempArray;
|
|
|
+ if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) {
|
|
|
+ maybeSetFirstError(lastLocalException);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Helper method that creates a thread to upload a block to azure storage.
|
|
|
- * @param payload
|
|
|
- * @throws IOException
|
|
|
+ * Set {@link #firstError} to the exception if it is not already set.
|
|
|
+ * @param exception exception to save
|
|
|
*/
|
|
|
- private synchronized void uploadBlockToStorage(byte[] payload)
|
|
|
- throws IOException {
|
|
|
-
|
|
|
- // upload payload to azure storage
|
|
|
- String blockId = generateBlockId();
|
|
|
-
|
|
|
- // Since uploads of the Azure storage are done in parallel threads, we go ahead
|
|
|
- // add the blockId in the uncommitted list. If the upload of the block fails
|
|
|
- // we don't commit the blockIds.
|
|
|
- BlockEntry blockEntry = new BlockEntry(blockId);
|
|
|
- blockEntry.setSize(payload.length);
|
|
|
- uncommittedBlockEntries.add(blockEntry);
|
|
|
- ioThreadPool.execute(new WriteRequest(payload, blockId));
|
|
|
+ private void maybeSetFirstError(IOException exception) {
|
|
|
+ firstError.compareAndSet(null, exception);
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
- * Helper method to updated the Blob metadata during Append lease operations.
|
|
|
- * Blob metadata is updated to holdLease value only if the current lease
|
|
|
- * status is equal to testCondition and the last update on the blob metadata
|
|
|
- * is less that 30 secs old.
|
|
|
- * @param holdLease
|
|
|
- * @param testCondition
|
|
|
- * @return true if the updated lease operation was successful or false otherwise
|
|
|
- * @throws StorageException
|
|
|
+ * Throw the first error caught if it has not been raised already
|
|
|
+ * @throws IOException if one is caught and needs to be thrown.
|
|
|
*/
|
|
|
- private boolean updateBlobAppendMetadata(boolean holdLease, boolean testCondition)
|
|
|
- throws StorageException {
|
|
|
-
|
|
|
- SelfRenewingLease lease = null;
|
|
|
- StorageException lastStorageException = null;
|
|
|
- int leaseRenewalRetryCount = 0;
|
|
|
-
|
|
|
- /*
|
|
|
- * Updating the Blob metadata honours following algorithm based on
|
|
|
- * 1) If the append lease metadata is present
|
|
|
- * 2) Last updated time of the append lease
|
|
|
- * 3) Previous value of the Append lease metadata.
|
|
|
- *
|
|
|
- * The algorithm:
|
|
|
- * 1) If append lease metadata is not part of the Blob. In this case
|
|
|
- * this is the first client to Append so we update the metadata.
|
|
|
- * 2) If append lease metadata is present and timeout has occurred.
|
|
|
- * In this case irrespective of what the value of the append lease is we update the metadata.
|
|
|
- * 3) If append lease metadata is present and is equal to testCondition value (passed as parameter)
|
|
|
- * and timeout has not occurred, we update the metadata.
|
|
|
- * 4) If append lease metadata is present and is not equal to testCondition value (passed as parameter)
|
|
|
- * and timeout has not occurred, we do not update metadata and return false.
|
|
|
- *
|
|
|
- */
|
|
|
- while (leaseRenewalRetryCount < MAX_LEASE_RENEWAL_RETRY_COUNT) {
|
|
|
-
|
|
|
- lastStorageException = null;
|
|
|
-
|
|
|
- synchronized(this) {
|
|
|
- try {
|
|
|
-
|
|
|
- final Calendar currentCalendar = Calendar
|
|
|
- .getInstance(Locale.US);
|
|
|
- currentCalendar.setTimeZone(TimeZone.getTimeZone(UTC_STR));
|
|
|
- long currentTime = currentCalendar.getTime().getTime();
|
|
|
+ private void maybeThrowFirstError() throws IOException {
|
|
|
+ if (firstError.get() != null) {
|
|
|
+ firstErrorThrown = true;
|
|
|
+ throw firstError.get();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- // Acquire lease on the blob.
|
|
|
- lease = new SelfRenewingLease(blob);
|
|
|
+ /**
|
|
|
+ * Write block list. The method captures retry logic
|
|
|
+ */
|
|
|
+ private void writeBlockListRequestInternal() {
|
|
|
|
|
|
- blob.downloadAttributes(opContext);
|
|
|
- HashMap<String, String> metadata = blob.getMetadata();
|
|
|
+ IOException lastLocalException = null;
|
|
|
|
|
|
- if (metadata.containsKey(APPEND_LEASE)
|
|
|
- && currentTime - Long.parseLong(
|
|
|
- metadata.get(APPEND_LEASE_LAST_MODIFIED)) <= BlockBlobAppendStream.APPEND_LEASE_TIMEOUT
|
|
|
- && !metadata.get(APPEND_LEASE).equals(Boolean.toString(testCondition))) {
|
|
|
- return false;
|
|
|
- }
|
|
|
+ int uploadRetryAttempts = 0;
|
|
|
+ while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) {
|
|
|
+ try {
|
|
|
|
|
|
- metadata.put(APPEND_LEASE, Boolean.toString(holdLease));
|
|
|
- metadata.put(APPEND_LEASE_LAST_MODIFIED, Long.toString(currentTime));
|
|
|
- blob.setMetadata(metadata);
|
|
|
- AccessCondition accessCondition = new AccessCondition();
|
|
|
- accessCondition.setLeaseID(lease.getLeaseID());
|
|
|
- blob.uploadMetadata(accessCondition, null, opContext);
|
|
|
- return true;
|
|
|
+ long startTime = System.nanoTime();
|
|
|
|
|
|
- } catch (StorageException ex) {
|
|
|
-
|
|
|
- lastStorageException = ex;
|
|
|
- LOG.debug("Lease renewal for Blob : {} encountered Storage Exception : {} "
|
|
|
- + "Error Code : {}",
|
|
|
- key, ex, ex.getErrorCode());
|
|
|
- leaseRenewalRetryCount++;
|
|
|
-
|
|
|
- } finally {
|
|
|
-
|
|
|
- if (lease != null) {
|
|
|
- try {
|
|
|
- lease.free();
|
|
|
- } catch(StorageException ex) {
|
|
|
- LOG.debug("Encountered Storage exception while releasing lease for Blob {} "
|
|
|
- + "during Append metadata operation. Storage Exception {} "
|
|
|
- + "Error Code : {} ", key, ex, ex.getErrorCode());
|
|
|
- } finally {
|
|
|
- lease = null;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ blob.commitBlockList(blockEntries, accessCondition,
|
|
|
+ new BlobRequestOptions(), opContext);
|
|
|
|
|
|
- if (leaseRenewalRetryCount == MAX_LEASE_RENEWAL_RETRY_COUNT) {
|
|
|
- throw lastStorageException;
|
|
|
- } else {
|
|
|
+ LOG.debug("Upload block list took {} ms for blob {} ",
|
|
|
+ TimeUnit.NANOSECONDS.toMillis(
|
|
|
+ System.nanoTime() - startTime), key);
|
|
|
+ break;
|
|
|
+
|
|
|
+ } catch(Exception ioe) {
|
|
|
+ LOG.debug("Encountered exception during uploading block for Blob {}"
|
|
|
+ + " Exception : {}", key, ioe);
|
|
|
+ uploadRetryAttempts++;
|
|
|
+ lastLocalException = new AzureException(
|
|
|
+ "Encountered Exception while uploading block: " + ioe, ioe);
|
|
|
try {
|
|
|
- Thread.sleep(LEASE_RENEWAL_RETRY_SLEEP_PERIOD);
|
|
|
- } catch(InterruptedException ex) {
|
|
|
- LOG.debug("Blob append metadata updated method interrupted");
|
|
|
+ Thread.sleep(
|
|
|
+ BLOCK_UPLOAD_RETRY_INTERVAL * (uploadRetryAttempts + 1));
|
|
|
+ } catch(InterruptedException ie) {
|
|
|
Thread.currentThread().interrupt();
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // The code should not enter here because the while loop will
|
|
|
- // always be executed and if the while loop is executed we
|
|
|
- // would returning from the while loop.
|
|
|
- return false;
|
|
|
+ if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) {
|
|
|
+ maybeSetFirstError(lastLocalException);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * This is the only method that should be writing to outBuffer to maintain consistency of the outBuffer.
|
|
|
- * @param data
|
|
|
- * @param offset
|
|
|
- * @param length
|
|
|
- * @throws IOException
|
|
|
+ * A ThreadFactory that creates uploader thread with
|
|
|
+ * meaningful names helpful for debugging purposes.
|
|
|
*/
|
|
|
- private synchronized void writeInternal(final byte[] data, final int offset, final int length)
|
|
|
- throws IOException {
|
|
|
+ class UploaderThreadFactory implements ThreadFactory {
|
|
|
|
|
|
- if (!initialized) {
|
|
|
- throw new IOException("Trying to write to an un-initialized Append stream");
|
|
|
+ @Override
|
|
|
+ public Thread newThread(Runnable r) {
|
|
|
+ Thread t = new Thread(r);
|
|
|
+ t.setName(String.format("%s-%d", THREAD_ID_PREFIX,
|
|
|
+ threadSequenceNumber.getAndIncrement()));
|
|
|
+ return t;
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- if (closed) {
|
|
|
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * Upload block commands.
|
|
|
+ */
|
|
|
+ private class UploadBlockCommand extends UploadCommand {
|
|
|
|
|
|
- if (leaseFreed) {
|
|
|
- throw new IOException(String.format("Write called on a append stream not holding lease. Failing Write"));
|
|
|
- }
|
|
|
+ // the block content for upload
|
|
|
+ private final ByteBuffer payload;
|
|
|
|
|
|
- byte[] currentData = new byte[length];
|
|
|
- System.arraycopy(data, offset, currentData, 0, length);
|
|
|
+ // description of the block
|
|
|
+ private final BlockEntry entry;
|
|
|
|
|
|
- // check to see if the data to be appended exceeds the
|
|
|
- // buffer size. If so we upload a block to azure storage.
|
|
|
- while ((outBuffer.size() + currentData.length) > bufferSize) {
|
|
|
+ UploadBlockCommand(String blockId, ByteBuffer payload) {
|
|
|
|
|
|
- byte[] payload = new byte[bufferSize];
|
|
|
+ super(blobLength);
|
|
|
|
|
|
- // Add data from the existing buffer
|
|
|
- System.arraycopy(outBuffer.toByteArray(), 0, payload, 0, outBuffer.size());
|
|
|
+ BlockEntry blockEntry = new BlockEntry(blockId);
|
|
|
+ blockEntry.setSize(payload.position());
|
|
|
+ blockEntry.setSearchMode(BlockSearchMode.LATEST);
|
|
|
|
|
|
- // Updating the available size in the payload
|
|
|
- int availableSpaceInPayload = bufferSize - outBuffer.size();
|
|
|
+ this.payload = payload;
|
|
|
+ this.entry = blockEntry;
|
|
|
|
|
|
- // Adding data from the current call
|
|
|
- System.arraycopy(currentData, 0, payload, outBuffer.size(), availableSpaceInPayload);
|
|
|
+ uncommittedBlockEntries.add(blockEntry);
|
|
|
+ }
|
|
|
|
|
|
- uploadBlockToStorage(payload);
|
|
|
+ /**
|
|
|
+ * Execute command.
|
|
|
+ */
|
|
|
+ void execute() throws InterruptedException {
|
|
|
+
|
|
|
+ uploadingSemaphore.acquire(1);
|
|
|
+ writeBlockRequestInternal(entry.getId(), payload, true);
|
|
|
+ uploadingSemaphore.release(1);
|
|
|
|
|
|
- // updating the currentData buffer
|
|
|
- byte[] tempBuffer = new byte[currentData.length - availableSpaceInPayload];
|
|
|
- System.arraycopy(currentData, availableSpaceInPayload,
|
|
|
- tempBuffer, 0, currentData.length - availableSpaceInPayload);
|
|
|
- currentData = tempBuffer;
|
|
|
- outBuffer = new ByteArrayOutputStream(bufferSize);
|
|
|
}
|
|
|
|
|
|
- outBuffer.write(currentData);
|
|
|
+ void dump() {
|
|
|
+ LOG.debug("upload block {} size: {} for blob {}",
|
|
|
+ entry.getId(),
|
|
|
+ entry.getSize(),
|
|
|
+ key);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Runnable instance that uploads the block of data to azure storage.
|
|
|
- *
|
|
|
- *
|
|
|
+ * Upload blob block list commands.
|
|
|
*/
|
|
|
- private class WriteRequest implements Runnable {
|
|
|
- private final byte[] dataPayload;
|
|
|
- private final String blockId;
|
|
|
+ private class UploadBlockListCommand extends UploadCommand {
|
|
|
+
|
|
|
+ private BlockEntry lastBlock = null;
|
|
|
+
|
|
|
+ UploadBlockListCommand() {
|
|
|
+ super(blobLength);
|
|
|
|
|
|
- public WriteRequest(byte[] dataPayload, String blockId) {
|
|
|
- this.dataPayload = dataPayload;
|
|
|
- this.blockId = blockId;
|
|
|
+ if (!uncommittedBlockEntries.isEmpty()) {
|
|
|
+ lastBlock = uncommittedBlockEntries.getLast();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
+ void awaitAsDependent() throws InterruptedException {
|
|
|
+ // empty. later commit block does not need to wait previous commit block
|
|
|
+ // lists.
|
|
|
+ }
|
|
|
|
|
|
- int uploadRetryAttempts = 0;
|
|
|
- IOException lastLocalException = null;
|
|
|
- while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) {
|
|
|
- try {
|
|
|
+ void dump() {
|
|
|
+ LOG.debug("commit block list with {} blocks for blob {}",
|
|
|
+ uncommittedBlockEntries.size(), key);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Execute command.
|
|
|
+ */
|
|
|
+ public void execute() throws InterruptedException, IOException {
|
|
|
+
|
|
|
+ if (committedBlobLength.get() >= getCommandBlobOffset()) {
|
|
|
+ LOG.debug("commit already applied for {}", key);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (lastBlock == null) {
|
|
|
+ LOG.debug("nothing to commit for {}", key);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.debug("active commands: {} for {}", activeBlockCommands.size(), key);
|
|
|
+
|
|
|
+ for (UploadCommand activeCommand : activeBlockCommands) {
|
|
|
+ if (activeCommand.getCommandBlobOffset() < getCommandBlobOffset()) {
|
|
|
+ activeCommand.dump();
|
|
|
+ activeCommand.awaitAsDependent();
|
|
|
+ } else {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // stop all uploads until the block list is committed
|
|
|
+ uploadingSemaphore.acquire(MAX_NUMBER_THREADS_IN_THREAD_POOL);
|
|
|
+
|
|
|
+ BlockEntry uncommittedBlock;
|
|
|
+ do {
|
|
|
+ uncommittedBlock = uncommittedBlockEntries.poll();
|
|
|
+ blockEntries.add(uncommittedBlock);
|
|
|
+ } while (uncommittedBlock != lastBlock);
|
|
|
+
|
|
|
+ if (blockEntries.size() > activateCompactionBlockCount) {
|
|
|
+ LOG.debug("Block compaction: activated with {} blocks for {}",
|
|
|
+ blockEntries.size(), key);
|
|
|
+
|
|
|
+ // Block compaction
|
|
|
+ long startCompaction = System.nanoTime();
|
|
|
+ blockCompaction();
|
|
|
+ LOG.debug("Block compaction finished for {} ms with {} blocks for {}",
|
|
|
+ TimeUnit.NANOSECONDS.toMillis(
|
|
|
+ System.nanoTime() - startCompaction),
|
|
|
+ blockEntries.size(), key);
|
|
|
+ }
|
|
|
+
|
|
|
+ writeBlockListRequestInternal();
|
|
|
+
|
|
|
+ uploadingSemaphore.release(MAX_NUMBER_THREADS_IN_THREAD_POOL);
|
|
|
|
|
|
- blob.uploadBlock(blockId, new ByteArrayInputStream(dataPayload),
|
|
|
- dataPayload.length, new BlobRequestOptions(), opContext);
|
|
|
+ // remove blocks previous commands
|
|
|
+ for (Iterator<UploadCommand> it = activeBlockCommands.iterator();
|
|
|
+ it.hasNext();) {
|
|
|
+ UploadCommand activeCommand = it.next();
|
|
|
+ if (activeCommand.getCommandBlobOffset() <= getCommandBlobOffset()) {
|
|
|
+ it.remove();
|
|
|
+ } else {
|
|
|
break;
|
|
|
- } catch(Exception ioe) {
|
|
|
- Log.getLog().debug("Encountered exception during uploading block for Blob : {} Exception : {}", key, ioe);
|
|
|
- uploadRetryAttempts++;
|
|
|
- lastLocalException = new IOException("Encountered Exception while uploading block", ioe);
|
|
|
- try {
|
|
|
- Thread.sleep(BLOCK_UPLOAD_RETRY_INTERVAL);
|
|
|
- } catch(InterruptedException ie) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ committedBlobLength.set(getCommandBlobOffset());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Internal output stream with read access to the internal buffer.
|
|
|
+ */
|
|
|
+ private class ByteArrayOutputStreamInternal extends ByteArrayOutputStream {
|
|
|
+
|
|
|
+ ByteArrayOutputStreamInternal(int size) {
|
|
|
+ super(size);
|
|
|
+ }
|
|
|
+
|
|
|
+ byte[] getByteArray() {
|
|
|
+ return buf;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Block compaction process.
|
|
|
+ *
|
|
|
+ * Block compaction is only enabled when the number of blocks exceeds
|
|
|
+ * activateCompactionBlockCount. The algorithm searches for the longest
|
|
|
+ * segment [b..e) where (e-b) > 2 && |b| + |b+1| ... |e-1| < maxBlockSize
|
|
|
+ * such that size(b1) + size(b2) + ... + size(bn) < maximum-block-size.
|
|
|
+ * It then downloads the blocks in the sequence, concatenates the data to
|
|
|
+ * form a single block, uploads this new block, and updates the block
|
|
|
+ * list to replace the sequence of blocks with the new block.
|
|
|
+ */
|
|
|
+ private void blockCompaction() throws IOException {
|
|
|
+ //current segment [segmentBegin, segmentEnd) and file offset/size of the
|
|
|
+ // current segment
|
|
|
+ int segmentBegin = 0, segmentEnd = 0;
|
|
|
+ long segmentOffsetBegin = 0, segmentOffsetEnd = 0;
|
|
|
+
|
|
|
+ //longest segment [maxSegmentBegin, maxSegmentEnd) and file offset/size of
|
|
|
+ // the longest segment
|
|
|
+ int maxSegmentBegin = 0, maxSegmentEnd = 0;
|
|
|
+ long maxSegmentOffsetBegin = 0, maxSegmentOffsetEnd = 0;
|
|
|
+
|
|
|
+ for (BlockEntry block : blockEntries) {
|
|
|
+ segmentEnd++;
|
|
|
+ segmentOffsetEnd += block.getSize();
|
|
|
+ if (segmentOffsetEnd - segmentOffsetBegin > maxBlockSize.get()) {
|
|
|
+ if (segmentEnd - segmentBegin > 2) {
|
|
|
+ if (maxSegmentEnd - maxSegmentBegin < segmentEnd - segmentBegin) {
|
|
|
+ maxSegmentBegin = segmentBegin;
|
|
|
+ maxSegmentEnd = segmentEnd;
|
|
|
+ maxSegmentOffsetBegin = segmentOffsetBegin;
|
|
|
+ maxSegmentOffsetEnd = segmentOffsetEnd - block.getSize();
|
|
|
+ }
|
|
|
}
|
|
|
+ segmentBegin = segmentEnd - 1;
|
|
|
+ segmentOffsetBegin = segmentOffsetEnd - block.getSize();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) {
|
|
|
- lastError = lastLocalException;
|
|
|
+ if (maxSegmentEnd - maxSegmentBegin > 1) {
|
|
|
+
|
|
|
+ LOG.debug("Block compaction: {} blocks for {}",
|
|
|
+ maxSegmentEnd - maxSegmentBegin, key);
|
|
|
+
|
|
|
+ // download synchronously all the blocks from the azure storage
|
|
|
+ ByteArrayOutputStreamInternal blockOutputStream
|
|
|
+ = new ByteArrayOutputStreamInternal(maxBlockSize.get());
|
|
|
+
|
|
|
+ try {
|
|
|
+ long length = maxSegmentOffsetEnd - maxSegmentOffsetBegin;
|
|
|
+ blob.downloadRange(maxSegmentOffsetBegin, length, blockOutputStream,
|
|
|
+ new BlobRequestOptions(), opContext);
|
|
|
+ } catch(StorageException ex) {
|
|
|
+ LOG.error(
|
|
|
+ "Storage exception encountered during block compaction phase"
|
|
|
+ + " : {} Storage Exception : {} Error Code: {}",
|
|
|
+ key, ex, ex.getErrorCode());
|
|
|
+ throw new AzureException(
|
|
|
+ "Encountered Exception while committing append blocks " + ex, ex);
|
|
|
+ }
|
|
|
+
|
|
|
+ // upload synchronously new block to the azure storage
|
|
|
+ String blockId = generateBlockId();
|
|
|
+
|
|
|
+ ByteBuffer byteBuffer = ByteBuffer.wrap(
|
|
|
+ blockOutputStream.getByteArray());
|
|
|
+ byteBuffer.position(blockOutputStream.size());
|
|
|
+
|
|
|
+ writeBlockRequestInternal(blockId, byteBuffer, false);
|
|
|
+
|
|
|
+ // replace blocks from the longest segment with new block id
|
|
|
+ blockEntries.subList(maxSegmentBegin + 1, maxSegmentEnd - 1).clear();
|
|
|
+ BlockEntry newBlock = blockEntries.get(maxSegmentBegin);
|
|
|
+ newBlock.setId(blockId);
|
|
|
+ newBlock.setSearchMode(BlockSearchMode.LATEST);
|
|
|
+ newBlock.setSize(maxSegmentOffsetEnd - maxSegmentOffsetBegin);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * A ThreadFactory that creates uploader thread with
|
|
|
- * meaningful names helpful for debugging purposes.
|
|
|
+ * Prepare block upload command and queue the command in thread pool executor.
|
|
|
*/
|
|
|
- class UploaderThreadFactory implements ThreadFactory {
|
|
|
+ private synchronized void addBlockUploadCommand() throws IOException {
|
|
|
+
|
|
|
+ maybeThrowFirstError();
|
|
|
+
|
|
|
+ if (blobExist && lease.isFreed()) {
|
|
|
+ throw new AzureException(String.format(
|
|
|
+ "Attempting to upload a block on blob : %s "
|
|
|
+ + " that does not have lease on the Blob. Failing upload", key));
|
|
|
+ }
|
|
|
+
|
|
|
+ int blockSize = outBuffer.position();
|
|
|
+ if (blockSize > 0) {
|
|
|
+ UploadCommand command = new UploadBlockCommand(generateBlockId(),
|
|
|
+ outBuffer);
|
|
|
+ activeBlockCommands.add(command);
|
|
|
+
|
|
|
+ blobLength += blockSize;
|
|
|
+ outBuffer = poolReadyByteBuffers.getBuffer(false, maxBlockSize.get());
|
|
|
+
|
|
|
+ ioThreadPool.execute(new WriteRequest(command));
|
|
|
|
|
|
- @Override
|
|
|
- public Thread newThread(Runnable r) {
|
|
|
- Thread t = new Thread(r);
|
|
|
- t.setName(String.format("%s-%s-%d", THREAD_ID_PREFIX, key,
|
|
|
- threadSequenceNumber.getAndIncrement()));
|
|
|
- return t;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * A deamon thread that renews the Append lease on the blob.
|
|
|
- * The thread sleeps for LEASE_RENEWAL_PERIOD time before renewing
|
|
|
- * the lease. If an error is encountered while renewing the lease
|
|
|
- * then an lease is released by this thread, which fails all other
|
|
|
- * operations.
|
|
|
+ * Prepare block list commit command and queue the command in thread pool
|
|
|
+ * executor.
|
|
|
*/
|
|
|
- private class AppendRenewer implements Runnable {
|
|
|
+ private synchronized UploadCommand addFlushCommand() throws IOException {
|
|
|
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
+ maybeThrowFirstError();
|
|
|
|
|
|
- while (!leaseFreed) {
|
|
|
+ if (blobExist && lease.isFreed()) {
|
|
|
+ throw new AzureException(
|
|
|
+ String.format("Attempting to upload block list on blob : %s"
|
|
|
+ + " that does not have lease on the Blob. Failing upload", key));
|
|
|
+ }
|
|
|
|
|
|
- try {
|
|
|
- Thread.sleep(LEASE_RENEWAL_PERIOD);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- LOG.debug("Appender Renewer thread interrupted");
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- }
|
|
|
+ UploadCommand command = new UploadBlockListCommand();
|
|
|
+ activeBlockCommands.add(command);
|
|
|
|
|
|
- Log.getLog().debug("Attempting to renew append lease on {}", key);
|
|
|
+ ioThreadPool.execute(new WriteRequest(command));
|
|
|
|
|
|
- try {
|
|
|
- if (!leaseFreed) {
|
|
|
- // Update the blob metadata to renew the append lease
|
|
|
- if (!updateBlobAppendMetadata(true, true)) {
|
|
|
- LOG.error("Unable to re-acquire append lease on the Blob {} ", key);
|
|
|
- leaseFreed = true;
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (StorageException ex) {
|
|
|
+ return command;
|
|
|
+ }
|
|
|
|
|
|
- LOG.debug("Lease renewal for Blob : {} encountered "
|
|
|
- + "Storage Exception : {} Error Code : {}", key, ex, ex.getErrorCode());
|
|
|
+ /**
|
|
|
+ * Runnable instance that uploads the block of data to azure storage.
|
|
|
+ */
|
|
|
+ private class WriteRequest implements Runnable {
|
|
|
+ private final UploadCommand command;
|
|
|
|
|
|
- // We swallow the exception here because if the blob metadata is not updated for
|
|
|
- // APPEND_LEASE_TIMEOUT period, another thread would be able to detect this and
|
|
|
- // continue forward if it needs to append.
|
|
|
- leaseFreed = true;
|
|
|
- }
|
|
|
+ WriteRequest(UploadCommand command) {
|
|
|
+ this.command = command;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+
|
|
|
+ try {
|
|
|
+ command.dump();
|
|
|
+ long startTime = System.nanoTime();
|
|
|
+ command.execute();
|
|
|
+ command.setCompleted();
|
|
|
+ LOG.debug("command finished for {} ms",
|
|
|
+ TimeUnit.NANOSECONDS.toMillis(
|
|
|
+ System.nanoTime() - startTime));
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ } catch (Exception ex) {
|
|
|
+ LOG.debug(
|
|
|
+ "Encountered exception during execution of command for Blob :"
|
|
|
+ + " {} Exception : {}", key, ex);
|
|
|
+ firstError.compareAndSet(null, new AzureException(ex));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-}
|
|
|
+}
|