|
@@ -27,6 +27,7 @@ import java.util.Collections;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
+import java.util.LinkedHashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
@@ -250,6 +251,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
private ExtendedBlock currentBlockGroup;
|
|
|
private final String[] favoredNodes;
|
|
|
private final List<StripedDataStreamer> failedStreamers;
|
|
|
+ private final Map<Integer, Integer> corruptBlockCountMap;
|
|
|
+ private int blockGroupIndex;
|
|
|
|
|
|
/** Construct a new output stream for creating a file. */
|
|
|
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
|
|
@@ -268,6 +271,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
numAllBlocks = numDataBlocks + numParityBlocks;
|
|
|
this.favoredNodes = favoredNodes;
|
|
|
failedStreamers = new ArrayList<>();
|
|
|
+ corruptBlockCountMap = new LinkedHashMap<>();
|
|
|
|
|
|
encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(),
|
|
|
numDataBlocks, numParityBlocks);
|
|
@@ -444,6 +448,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
}
|
|
|
// assign the new block to the current block group
|
|
|
currentBlockGroup = lb.getBlock();
|
|
|
+ blockGroupIndex++;
|
|
|
|
|
|
final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
|
|
|
(LocatedStripedBlock) lb, cellSize, numDataBlocks,
|
|
@@ -590,6 +595,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
while (newFailed.size() > 0) {
|
|
|
failedStreamers.addAll(newFailed);
|
|
|
coordinator.clearFailureStates();
|
|
|
+ corruptBlockCountMap.put(blockGroupIndex, failedStreamers.size());
|
|
|
|
|
|
// mark all the healthy streamers as external error
|
|
|
Set<StripedDataStreamer> healthySet = markExternalErrorOnStreamers();
|
|
@@ -957,6 +963,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
dfsClient.getTracer().newScope("completeFile")) {
|
|
|
completeFile(currentBlockGroup);
|
|
|
}
|
|
|
+ logCorruptBlocks();
|
|
|
} catch (ClosedChannelException ignored) {
|
|
|
} finally {
|
|
|
setClosed();
|
|
@@ -1004,6 +1011,20 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void logCorruptBlocks() {
|
|
|
+ for (Map.Entry<Integer, Integer> entry : corruptBlockCountMap.entrySet()) {
|
|
|
+ int bgIndex = entry.getKey();
|
|
|
+ int corruptBlockCount = entry.getValue();
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ sb.append("Block group <").append(bgIndex).append("> has ")
|
|
|
+ .append(corruptBlockCount).append(" corrupt blocks.");
|
|
|
+ if (corruptBlockCount == numAllBlocks - numDataBlocks) {
|
|
|
+ sb.append(" It's at high risk of losing data.");
|
|
|
+ }
|
|
|
+ LOG.warn(sb.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
ExtendedBlock getBlock() {
|
|
|
return currentBlockGroup;
|