|
@@ -641,7 +641,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
|
|
// wait till all the healthy streamers to
|
|
|
// 1) get the updated block info
|
|
|
// 2) create new block outputstream
|
|
|
- newFailed = waitCreatingNewStreams(healthySet);
|
|
|
+ newFailed = waitCreatingStreamers(healthySet);
|
|
|
if (newFailed.size() + failedStreamers.size() >
|
|
|
numAllBlocks - numDataBlocks) {
|
|
|
throw new IOException(
|
|
@@ -668,6 +668,14 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Check if the streamers were successfully updated, adding failed streamers
|
|
|
+ * in the <i>failed</i> return parameter.
|
|
|
+ * @param failed Return parameter containing failed streamers from
|
|
|
+ * <i>streamers</i>.
|
|
|
+ * @param streamers Set of streamers that are being updated
|
|
|
+ * @return total number of successful updates and failures
|
|
|
+ */
|
|
|
private int checkStreamerUpdates(Set<StripedDataStreamer> failed,
|
|
|
Set<StripedDataStreamer> streamers) {
|
|
|
for (StripedDataStreamer streamer : streamers) {
|
|
@@ -682,7 +690,15 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
|
|
return coordinator.updateStreamerMap.size() + failed.size();
|
|
|
}
|
|
|
|
|
|
- private Set<StripedDataStreamer> waitCreatingNewStreams(
|
|
|
+ /**
|
|
|
+ * Waits for streamers to be created.
|
|
|
+ *
|
|
|
+ * @param healthyStreamers Set of healthy streamers
|
|
|
+ * @return Set of streamers that failed.
|
|
|
+ *
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private Set<StripedDataStreamer> waitCreatingStreamers(
|
|
|
Set<StripedDataStreamer> healthyStreamers) throws IOException {
|
|
|
Set<StripedDataStreamer> failed = new HashSet<>();
|
|
|
final int expectedNum = healthyStreamers.size();
|
|
@@ -773,9 +789,10 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // should update the block group length based on the acked length
|
|
|
+ // Update the NameNode with the acked length of the block group
|
|
|
+ // Save and restore the unacked length
|
|
|
final long sentBytes = currentBlockGroup.getNumBytes();
|
|
|
- final long ackedBytes = getNumAckedStripes() * cellSize * numDataBlocks;
|
|
|
+ final long ackedBytes = getAckedLength();
|
|
|
Preconditions.checkState(ackedBytes <= sentBytes,
|
|
|
"Acked:" + ackedBytes + ", Sent:" + sentBytes);
|
|
|
currentBlockGroup.setNumBytes(ackedBytes);
|
|
@@ -787,23 +804,140 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Get the number of acked stripes. An acked stripe means at least data block
|
|
|
- * number size cells of the stripe were acked.
|
|
|
+ * Return the length of each block in the block group.
|
|
|
+ * Unhealthy blocks have a length of -1.
|
|
|
+ *
|
|
|
+ * @return List of block lengths.
|
|
|
*/
|
|
|
- private long getNumAckedStripes() {
|
|
|
- int minStripeNum = Integer.MAX_VALUE;
|
|
|
+ private List<Long> getBlockLengths() {
|
|
|
+ List<Long> blockLengths = new ArrayList<>(numAllBlocks);
|
|
|
for (int i = 0; i < numAllBlocks; i++) {
|
|
|
final StripedDataStreamer streamer = getStripedDataStreamer(i);
|
|
|
+ long numBytes = -1;
|
|
|
if (streamer.isHealthy()) {
|
|
|
- int curStripeNum = 0;
|
|
|
if (streamer.getBlock() != null) {
|
|
|
- curStripeNum = (int) (streamer.getBlock().getNumBytes() / cellSize);
|
|
|
+ numBytes = streamer.getBlock().getNumBytes();
|
|
|
}
|
|
|
- minStripeNum = Math.min(curStripeNum, minStripeNum);
|
|
|
+ }
|
|
|
+ blockLengths.add(numBytes);
|
|
|
+ }
|
|
|
+ return blockLengths;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the length of acked bytes in the block group.
|
|
|
+ *
|
|
|
+ * <p>
|
|
|
+ * A full stripe is acked when at least numDataBlocks streamers have
|
|
|
+ * the corresponding cells of the stripe, and all previous full stripes are
|
|
|
+ * also acked. This enforces the constraint that there is at most one
|
|
|
+ * partial stripe.
|
|
|
+ * </p>
|
|
|
+ * <p>
|
|
|
+ * Partial stripes write all parity cells. Empty data cells are not written.
|
|
|
+ * Parity cells are the length of the longest data cell(s). For example,
|
|
|
+ * with RS(3,2), if we have data cells with lengths [1MB, 64KB, 0], the
|
|
|
+ * parity blocks will be length [1MB, 1MB].
|
|
|
+ * </p>
|
|
|
+ * <p>
|
|
|
+ * To be considered acked, a partial stripe needs at least numDataBlocks
|
|
|
+ * empty or written cells.
|
|
|
+ * </p>
|
|
|
+ * <p>
|
|
|
+ * Currently, partial stripes can only happen when closing the file at a
|
|
|
+ * non-stripe boundary, but this could also happen during (currently
|
|
|
+ * unimplemented) hflush/hsync support.
|
|
|
+ * </p>
|
|
|
+ */
|
|
|
+ private long getAckedLength() {
|
|
|
+ // Determine the number of full stripes that are sufficiently durable
|
|
|
+ final long sentBytes = currentBlockGroup.getNumBytes();
|
|
|
+ final long numFullStripes = sentBytes / numDataBlocks / cellSize;
|
|
|
+ final long fullStripeLength = numFullStripes * numDataBlocks * cellSize;
|
|
|
+ assert fullStripeLength <= sentBytes : "Full stripe length can't be " +
|
|
|
+ "greater than the block group length";
|
|
|
+
|
|
|
+ long ackedLength = 0;
|
|
|
+
|
|
|
+ // Determine the length contained by at least `numDataBlocks` blocks.
|
|
|
+ // Since it's sorted, all the blocks after `offset` are at least as long,
|
|
|
+ // and there are at least `numDataBlocks` at or after `offset`.
|
|
|
+ List<Long> blockLengths = Collections.unmodifiableList(getBlockLengths());
|
|
|
+ List<Long> sortedBlockLengths = new ArrayList<>(blockLengths);
|
|
|
+ Collections.sort(sortedBlockLengths);
|
|
|
+ if (numFullStripes > 0) {
|
|
|
+ final int offset = sortedBlockLengths.size() - numDataBlocks;
|
|
|
+ ackedLength = sortedBlockLengths.get(offset) * numDataBlocks;
|
|
|
+ }
|
|
|
+
|
|
|
+ // If the acked length is less than the expected full stripe length, then
|
|
|
+ // we're missing a full stripe. Return the acked length.
|
|
|
+ if (ackedLength < fullStripeLength) {
|
|
|
+ return ackedLength;
|
|
|
+ }
|
|
|
+ // If the expected length is exactly a stripe boundary, then we're also done
|
|
|
+ if (ackedLength == sentBytes) {
|
|
|
+ return ackedLength;
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ Otherwise, we're potentially dealing with a partial stripe.
|
|
|
+ The partial stripe is laid out as follows:
|
|
|
+
|
|
|
+ 0 or more full data cells, `cellSize` in length.
|
|
|
+ 0 or 1 partial data cells.
|
|
|
+ 0 or more empty data cells.
|
|
|
+ `numParityBlocks` parity cells, the length of the longest data cell.
|
|
|
+
|
|
|
+ If the partial stripe is sufficiently acked, we'll update the ackedLength.
|
|
|
+ */
|
|
|
+
|
|
|
+ // How many full and empty data cells do we expect?
|
|
|
+ final int numFullDataCells = (int)
|
|
|
+ ((sentBytes - fullStripeLength) / cellSize);
|
|
|
+ final int partialLength = (int) (sentBytes - fullStripeLength) % cellSize;
|
|
|
+ final int numPartialDataCells = partialLength == 0 ? 0 : 1;
|
|
|
+ final int numEmptyDataCells = numDataBlocks - numFullDataCells -
|
|
|
+ numPartialDataCells;
|
|
|
+ // Calculate the expected length of the parity blocks.
|
|
|
+ final int parityLength = numFullDataCells > 0 ? cellSize : partialLength;
|
|
|
+
|
|
|
+ final long fullStripeBlockOffset = fullStripeLength / numDataBlocks;
|
|
|
+
|
|
|
+ // Iterate through each type of streamers, checking the expected length.
|
|
|
+ long[] expectedBlockLengths = new long[numAllBlocks];
|
|
|
+ int idx = 0;
|
|
|
+ // Full cells
|
|
|
+ for (; idx < numFullDataCells; idx++) {
|
|
|
+ expectedBlockLengths[idx] = fullStripeBlockOffset + cellSize;
|
|
|
+ }
|
|
|
+ // Partial cell
|
|
|
+ for (; idx < numFullDataCells + numPartialDataCells; idx++) {
|
|
|
+ expectedBlockLengths[idx] = fullStripeBlockOffset + partialLength;
|
|
|
+ }
|
|
|
+ // Empty cells
|
|
|
+ for (; idx < numFullDataCells + numPartialDataCells + numEmptyDataCells;
|
|
|
+ idx++) {
|
|
|
+ expectedBlockLengths[idx] = fullStripeBlockOffset;
|
|
|
+ }
|
|
|
+ // Parity cells
|
|
|
+ for (; idx < numAllBlocks; idx++) {
|
|
|
+ expectedBlockLengths[idx] = fullStripeBlockOffset + parityLength;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check expected lengths against actual streamer lengths.
|
|
|
+ // Update if we have sufficient durability.
|
|
|
+ int numBlocksWithCorrectLength = 0;
|
|
|
+ for (int i = 0; i < numAllBlocks; i++) {
|
|
|
+ if (blockLengths.get(i) == expectedBlockLengths[i]) {
|
|
|
+ numBlocksWithCorrectLength++;
|
|
|
}
|
|
|
}
|
|
|
- assert minStripeNum != Integer.MAX_VALUE;
|
|
|
- return minStripeNum;
|
|
|
+ if (numBlocksWithCorrectLength >= numDataBlocks) {
|
|
|
+ ackedLength = sentBytes;
|
|
|
+ }
|
|
|
+
|
|
|
+ return ackedLength;
|
|
|
}
|
|
|
|
|
|
private int stripeDataSize() {
|