|
@@ -36,7 +36,6 @@ import org.apache.hadoop.hdfs.protocol.ECInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
-import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
|
|
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
|
|
|
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
@@ -278,14 +277,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
return numDataBlocks * cellSize;
|
|
|
}
|
|
|
|
|
|
- private long getCurrentBlockGroupBytes() {
|
|
|
- long sum = 0;
|
|
|
- for (int i = 0; i < numDataBlocks; i++) {
|
|
|
- sum += streamers.get(i).getBytesCurBlock();
|
|
|
- }
|
|
|
- return sum;
|
|
|
- }
|
|
|
-
|
|
|
private void notSupported(String headMsg)
|
|
|
throws IOException{
|
|
|
throw new IOException(
|
|
@@ -347,37 +338,43 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Simply add bytesCurBlock together. Note that this result is not accurately
|
|
|
+ * the size of the block group.
|
|
|
+ */
|
|
|
+ private long getCurrentSumBytes() {
|
|
|
+ long sum = 0;
|
|
|
+ for (int i = 0; i < numDataBlocks; i++) {
|
|
|
+ sum += streamers.get(i).getBytesCurBlock();
|
|
|
+ }
|
|
|
+ return sum;
|
|
|
+ }
|
|
|
+
|
|
|
private void writeParityCellsForLastStripe() throws IOException {
|
|
|
- final long currentBlockGroupBytes = getCurrentBlockGroupBytes();
|
|
|
- long parityBlkSize = StripedBlockUtil.getInternalBlockLength(
|
|
|
- currentBlockGroupBytes, cellSize, numDataBlocks,
|
|
|
- numDataBlocks + 1);
|
|
|
- if (parityBlkSize == 0 || currentBlockGroupBytes % stripeDataSize() == 0) {
|
|
|
+ final long currentBlockGroupBytes = getCurrentSumBytes();
|
|
|
+ if (currentBlockGroupBytes % stripeDataSize() == 0) {
|
|
|
return;
|
|
|
}
|
|
|
- int parityCellSize = parityBlkSize % cellSize == 0 ? cellSize :
|
|
|
- (int) (parityBlkSize % cellSize);
|
|
|
+ long firstCellSize = getLeadingStreamer().getBytesCurBlock() % cellSize;
|
|
|
+ long parityCellSize = firstCellSize > 0 && firstCellSize < cellSize ?
|
|
|
+ firstCellSize : cellSize;
|
|
|
|
|
|
for (int i = 0; i < numAllBlocks; i++) {
|
|
|
- long internalBlkLen = StripedBlockUtil.getInternalBlockLength(
|
|
|
- currentBlockGroupBytes, cellSize, numDataBlocks, i);
|
|
|
// Pad zero bytes to make all cells exactly the size of parityCellSize
|
|
|
// If internal block is smaller than parity block, pad zero bytes.
|
|
|
// Also pad zero bytes to all parity cells
|
|
|
- if (internalBlkLen < parityBlkSize || i >= numDataBlocks) {
|
|
|
- int position = cellBuffers[i].position();
|
|
|
- assert position <= parityCellSize : "If an internal block is smaller" +
|
|
|
- " than parity block, then its last cell should be small than last" +
|
|
|
- " parity cell";
|
|
|
- for (int j = 0; j < parityCellSize - position; j++) {
|
|
|
- cellBuffers[i].put((byte) 0);
|
|
|
- }
|
|
|
+ int position = cellBuffers[i].position();
|
|
|
+ assert position <= parityCellSize : "If an internal block is smaller" +
|
|
|
+ " than parity block, then its last cell should be small than last" +
|
|
|
+ " parity cell";
|
|
|
+ for (int j = 0; j < parityCellSize - position; j++) {
|
|
|
+ cellBuffers[i].put((byte) 0);
|
|
|
}
|
|
|
cellBuffers[i].flip();
|
|
|
}
|
|
|
encode(cellBuffers);
|
|
|
|
|
|
- //write parity cells
|
|
|
+ // write parity cells
|
|
|
curIdx = numDataBlocks;
|
|
|
refreshStreamer();
|
|
|
for (int i = numDataBlocks; i < numAllBlocks; i++) {
|