|
@@ -18,25 +18,20 @@
|
|
|
|
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
|
|
|
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
|
|
|
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
|
|
|
-
|
|
|
import java.io.IOException;
|
|
|
import java.io.InterruptedIOException;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
-
|
|
|
import org.apache.hadoop.hdfs.DFSStripedOutputStream.Coordinator;
|
|
|
import org.apache.hadoop.hdfs.DFSStripedOutputStream.MultipleBlockingQueue;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
|
|
import org.apache.hadoop.hdfs.util.ByteArrayManager;
|
|
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
|
|
+import org.apache.hadoop.io.erasurecode.ECSchema;
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
|
|
@@ -109,6 +104,8 @@ public class StripedDataStreamer extends DataStreamer {
|
|
|
private final Coordinator coordinator;
|
|
|
private final int index;
|
|
|
private volatile boolean failed;
|
|
|
+ private final ECSchema schema;
|
|
|
+ private final int cellSize;
|
|
|
|
|
|
StripedDataStreamer(HdfsFileStatus stat,
|
|
|
DFSClient dfsClient, String src,
|
|
@@ -120,6 +117,8 @@ public class StripedDataStreamer extends DataStreamer {
|
|
|
byteArrayManage, favoredNodes);
|
|
|
this.index = index;
|
|
|
this.coordinator = coordinator;
|
|
|
+ this.schema = stat.getErasureCodingPolicy().getSchema();
|
|
|
+ this.cellSize = stat.getErasureCodingPolicy().getCellSize();
|
|
|
}
|
|
|
|
|
|
int getIndex() {
|
|
@@ -135,7 +134,7 @@ public class StripedDataStreamer extends DataStreamer {
|
|
|
}
|
|
|
|
|
|
private boolean isParityStreamer() {
|
|
|
- return index >= NUM_DATA_BLOCKS;
|
|
|
+ return index >= schema.getNumDataUnits();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -168,7 +167,7 @@ public class StripedDataStreamer extends DataStreamer {
|
|
|
if (block != null) {
|
|
|
// set numByte for the previous block group
|
|
|
long bytes = 0;
|
|
|
- for (int i = 0; i < NUM_DATA_BLOCKS; i++) {
|
|
|
+ for (int i = 0; i < schema.getNumDataUnits(); i++) {
|
|
|
final ExtendedBlock b = coordinator.takeEndBlock(i);
|
|
|
StripedBlockUtil.checkBlocks(index, block, i, b);
|
|
|
bytes += b.getNumBytes();
|
|
@@ -183,15 +182,15 @@ public class StripedDataStreamer extends DataStreamer {
|
|
|
|
|
|
final LocatedBlock lb = StripedDataStreamer.super.locateFollowingBlock(
|
|
|
excludedNodes);
|
|
|
- if (lb.getLocations().length < HdfsConstants.NUM_DATA_BLOCKS) {
|
|
|
+ if (lb.getLocations().length < schema.getNumDataUnits()) {
|
|
|
throw new IOException(
|
|
|
"Failed to get datablocks number of nodes from namenode: blockGroupSize= "
|
|
|
- + (HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS)
|
|
|
+ + (schema.getNumDataUnits() + schema.getNumParityUnits())
|
|
|
+ ", blocks.length= " + lb.getLocations().length);
|
|
|
}
|
|
|
- final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
|
|
|
- (LocatedStripedBlock)lb,
|
|
|
- BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
|
|
|
+ final LocatedBlock[] blocks =
|
|
|
+ StripedBlockUtil.parseStripedBlockGroup((LocatedStripedBlock) lb,
|
|
|
+ cellSize, schema.getNumDataUnits(), schema.getNumParityUnits());
|
|
|
|
|
|
for (int i = 0; i < blocks.length; i++) {
|
|
|
StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
|
|
@@ -233,9 +232,10 @@ public class StripedDataStreamer extends DataStreamer {
|
|
|
final LocatedBlock updated = callUpdateBlockForPipeline(bg);
|
|
|
final long newGS = updated.getBlock().getGenerationStamp();
|
|
|
final LocatedBlock[] updatedBlks = StripedBlockUtil
|
|
|
- .parseStripedBlockGroup((LocatedStripedBlock) updated,
|
|
|
- BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
|
|
|
- for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
|
|
|
+ .parseStripedBlockGroup((LocatedStripedBlock) updated, cellSize,
|
|
|
+ schema.getNumDataUnits(), schema.getNumParityUnits());
|
|
|
+ for (int i = 0; i < schema.getNumDataUnits()
|
|
|
+ + schema.getNumParityUnits(); i++) {
|
|
|
StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
|
|
|
if (si.isFailed()) {
|
|
|
continue; // skipping failed data streamer
|
|
@@ -280,7 +280,7 @@ public class StripedDataStreamer extends DataStreamer {
|
|
|
final ExtendedBlock bg = coordinator.getBlockGroup();
|
|
|
final ExtendedBlock newBG = newBlock(bg, newGS);
|
|
|
|
|
|
- final int n = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
|
|
|
+ final int n = schema.getNumDataUnits() + schema.getNumParityUnits();
|
|
|
final DatanodeInfo[] newNodes = new DatanodeInfo[n];
|
|
|
final String[] newStorageIDs = new String[n];
|
|
|
for (int i = 0; i < n; i++) {
|