|
@@ -260,7 +260,6 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
|
|
|
|
|
private final Coordinator coordinator;
|
|
private final Coordinator coordinator;
|
|
private final CellBuffers cellBuffers;
|
|
private final CellBuffers cellBuffers;
|
|
- private final ErasureCodingPolicy ecPolicy;
|
|
|
|
private final RawErasureEncoder encoder;
|
|
private final RawErasureEncoder encoder;
|
|
private final List<StripedDataStreamer> streamers;
|
|
private final List<StripedDataStreamer> streamers;
|
|
private final DFSPacket[] currentPackets; // current Packet of each streamer
|
|
private final DFSPacket[] currentPackets; // current Packet of each streamer
|
|
@@ -287,7 +286,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
|
LOG.debug("Creating DFSStripedOutputStream for " + src);
|
|
LOG.debug("Creating DFSStripedOutputStream for " + src);
|
|
}
|
|
}
|
|
|
|
|
|
- ecPolicy = stat.getErasureCodingPolicy();
|
|
|
|
|
|
+ final ErasureCodingPolicy ecPolicy = stat.getErasureCodingPolicy();
|
|
final int numParityBlocks = ecPolicy.getNumParityUnits();
|
|
final int numParityBlocks = ecPolicy.getNumParityUnits();
|
|
cellSize = ecPolicy.getCellSize();
|
|
cellSize = ecPolicy.getCellSize();
|
|
numDataBlocks = ecPolicy.getNumDataUnits();
|
|
numDataBlocks = ecPolicy.getNumDataUnits();
|
|
@@ -479,6 +478,11 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
|
final LocatedBlock lb = addBlock(excludedNodes, dfsClient, src,
|
|
final LocatedBlock lb = addBlock(excludedNodes, dfsClient, src,
|
|
currentBlockGroup, fileId, favoredNodes, getAddBlockFlags());
|
|
currentBlockGroup, fileId, favoredNodes, getAddBlockFlags());
|
|
assert lb.isStriped();
|
|
assert lb.isStriped();
|
|
|
|
+ if (lb.getLocations().length < numDataBlocks) {
|
|
|
|
+ throw new IOException("Failed to get " + numDataBlocks
|
|
|
|
+ + " nodes from namenode: blockGroupSize= " + numAllBlocks
|
|
|
|
+ + ", blocks.length= " + lb.getLocations().length);
|
|
|
|
+ }
|
|
// assign the new block to the current block group
|
|
// assign the new block to the current block group
|
|
currentBlockGroup = lb.getBlock();
|
|
currentBlockGroup = lb.getBlock();
|
|
blockGroupIndex++;
|
|
blockGroupIndex++;
|
|
@@ -490,16 +494,11 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
|
StripedDataStreamer si = getStripedDataStreamer(i);
|
|
StripedDataStreamer si = getStripedDataStreamer(i);
|
|
assert si.isHealthy();
|
|
assert si.isHealthy();
|
|
if (blocks[i] == null) {
|
|
if (blocks[i] == null) {
|
|
- // allocBlock() should guarantee that all data blocks are successfully
|
|
|
|
- // allocated.
|
|
|
|
- assert i >= numDataBlocks;
|
|
|
|
// Set exception and close streamer as there is no block locations
|
|
// Set exception and close streamer as there is no block locations
|
|
// found for the parity block.
|
|
// found for the parity block.
|
|
- LOG.warn("Cannot allocate parity block(index={}, policy={}). " +
|
|
|
|
- "Not enough datanodes? Excluded nodes={}", i, ecPolicy.getName(),
|
|
|
|
- excludedNodes);
|
|
|
|
|
|
+ LOG.warn("Failed to get block location for parity block, index=" + i);
|
|
si.getLastException().set(
|
|
si.getLastException().set(
|
|
- new IOException("Failed to get parity block, index=" + i));
|
|
|
|
|
|
+ new IOException("Failed to get following block, i=" + i));
|
|
si.getErrorState().setInternalError();
|
|
si.getErrorState().setInternalError();
|
|
si.close(true);
|
|
si.close(true);
|
|
} else {
|
|
} else {
|