|
@@ -260,6 +260,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
|
|
|
|
|
private final Coordinator coordinator;
|
|
private final Coordinator coordinator;
|
|
private final CellBuffers cellBuffers;
|
|
private final CellBuffers cellBuffers;
|
|
|
|
+ private final ErasureCodingPolicy ecPolicy;
|
|
private final RawErasureEncoder encoder;
|
|
private final RawErasureEncoder encoder;
|
|
private final List<StripedDataStreamer> streamers;
|
|
private final List<StripedDataStreamer> streamers;
|
|
private final DFSPacket[] currentPackets; // current Packet of each streamer
|
|
private final DFSPacket[] currentPackets; // current Packet of each streamer
|
|
@@ -286,7 +287,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
|
LOG.debug("Creating DFSStripedOutputStream for " + src);
|
|
LOG.debug("Creating DFSStripedOutputStream for " + src);
|
|
}
|
|
}
|
|
|
|
|
|
- final ErasureCodingPolicy ecPolicy = stat.getErasureCodingPolicy();
|
|
|
|
|
|
+ ecPolicy = stat.getErasureCodingPolicy();
|
|
final int numParityBlocks = ecPolicy.getNumParityUnits();
|
|
final int numParityBlocks = ecPolicy.getNumParityUnits();
|
|
cellSize = ecPolicy.getCellSize();
|
|
cellSize = ecPolicy.getCellSize();
|
|
numDataBlocks = ecPolicy.getNumDataUnits();
|
|
numDataBlocks = ecPolicy.getNumDataUnits();
|
|
@@ -478,11 +479,6 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
|
final LocatedBlock lb = addBlock(excludedNodes, dfsClient, src,
|
|
final LocatedBlock lb = addBlock(excludedNodes, dfsClient, src,
|
|
currentBlockGroup, fileId, favoredNodes, getAddBlockFlags());
|
|
currentBlockGroup, fileId, favoredNodes, getAddBlockFlags());
|
|
assert lb.isStriped();
|
|
assert lb.isStriped();
|
|
- if (lb.getLocations().length < numDataBlocks) {
|
|
|
|
- throw new IOException("Failed to get " + numDataBlocks
|
|
|
|
- + " nodes from namenode: blockGroupSize= " + numAllBlocks
|
|
|
|
- + ", blocks.length= " + lb.getLocations().length);
|
|
|
|
- }
|
|
|
|
// assign the new block to the current block group
|
|
// assign the new block to the current block group
|
|
currentBlockGroup = lb.getBlock();
|
|
currentBlockGroup = lb.getBlock();
|
|
blockGroupIndex++;
|
|
blockGroupIndex++;
|
|
@@ -494,11 +490,16 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
|
StripedDataStreamer si = getStripedDataStreamer(i);
|
|
StripedDataStreamer si = getStripedDataStreamer(i);
|
|
assert si.isHealthy();
|
|
assert si.isHealthy();
|
|
if (blocks[i] == null) {
|
|
if (blocks[i] == null) {
|
|
|
|
+ // allocBlock() should guarantee that all data blocks are successfully
|
|
|
|
+ // allocated.
|
|
|
|
+ assert i >= numDataBlocks;
|
|
// Set exception and close streamer as there is no block locations
|
|
// Set exception and close streamer as there is no block locations
|
|
// found for the parity block.
|
|
// found for the parity block.
|
|
- LOG.warn("Failed to get block location for parity block, index=" + i);
|
|
|
|
|
|
+ LOG.warn("Cannot allocate parity block(index={}, policy={}). " +
|
|
|
|
+ "Not enough datanodes? Exclude nodes={}", i, ecPolicy.getName(),
|
|
|
|
+ excludedNodes);
|
|
si.getLastException().set(
|
|
si.getLastException().set(
|
|
- new IOException("Failed to get following block, i=" + i));
|
|
|
|
|
|
+ new IOException("Failed to get parity block, index=" + i));
|
|
si.getErrorState().setInternalError();
|
|
si.getErrorState().setInternalError();
|
|
si.close(true);
|
|
si.close(true);
|
|
} else {
|
|
} else {
|