|
@@ -269,6 +269,7 @@ public class TestDatanodeBlockScanner {
|
|
|
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 3);
|
|
|
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3L);
|
|
|
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
|
|
|
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 5L);
|
|
|
|
|
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
|
|
|
cluster.waitActive();
|
|
@@ -276,35 +277,47 @@ public class TestDatanodeBlockScanner {
|
|
|
Path file1 = new Path("/tmp/testBlockCorruptRecovery/file");
|
|
|
DFSTestUtil.createFile(fs, file1, 1024, numReplicas, 0);
|
|
|
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
|
|
|
+ final int ITERATIONS = 10;
|
|
|
|
|
|
// Wait until block is replicated to numReplicas
|
|
|
DFSTestUtil.waitReplication(fs, file1, numReplicas);
|
|
|
|
|
|
- // Corrupt numCorruptReplicas replicas of block
|
|
|
- int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
|
|
|
- for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
|
|
|
- if (corruptReplica(block, i)) {
|
|
|
- corruptReplicasDNIDs[j++] = i;
|
|
|
- LOG.info("successfully corrupted block " + block + " on node "
|
|
|
- + i + " " + cluster.getDataNodes().get(i).getDisplayName());
|
|
|
+ for (int k = 0; ; k++) {
|
|
|
+ // Corrupt numCorruptReplicas replicas of block
|
|
|
+ int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
|
|
|
+ for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
|
|
|
+ if (corruptReplica(block, i)) {
|
|
|
+ corruptReplicasDNIDs[j++] = i;
|
|
|
+ LOG.info("successfully corrupted block " + block + " on node "
|
|
|
+ + i + " " + cluster.getDataNodes().get(i).getDisplayName());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Restart the datanodes containing corrupt replicas
|
|
|
+ // so they would be reported to namenode and re-replicated
|
|
|
+ // They MUST be restarted in reverse order from highest to lowest index,
|
|
|
+ // because the act of restarting them removes them from the ArrayList
|
|
|
+ // and causes the indexes of all nodes above them in the list to change.
|
|
|
+ for (int i = numCorruptReplicas - 1; i >= 0 ; i--) {
|
|
|
+ LOG.info("restarting node with corrupt replica: position "
|
|
|
+ + i + " node " + corruptReplicasDNIDs[i] + " "
|
|
|
+ + cluster.getDataNodes().get(corruptReplicasDNIDs[i]).getDisplayName());
|
|
|
+ cluster.restartDataNode(corruptReplicasDNIDs[i]);
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- // Restart the datanodes containing corrupt replicas
|
|
|
- // so they would be reported to namenode and re-replicated
|
|
|
- // They MUST be restarted in reverse order from highest to lowest index,
|
|
|
- // because the act of restarting them removes them from the ArrayList
|
|
|
- // and causes the indexes of all nodes above them in the list to change.
|
|
|
- for (int i = numCorruptReplicas - 1; i >= 0 ; i--) {
|
|
|
- LOG.info("restarting node with corrupt replica: position "
|
|
|
- + i + " node " + corruptReplicasDNIDs[i] + " "
|
|
|
- + cluster.getDataNodes().get(corruptReplicasDNIDs[i]).getDisplayName());
|
|
|
- cluster.restartDataNode(corruptReplicasDNIDs[i]);
|
|
|
- }
|
|
|
|
|
|
- // Loop until all corrupt replicas are reported
|
|
|
- DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1,
|
|
|
- block, numCorruptReplicas);
|
|
|
+ // Loop until all corrupt replicas are reported
|
|
|
+ try {
|
|
|
+ DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1,
|
|
|
+ block, numCorruptReplicas);
|
|
|
+ } catch(TimeoutException e) {
|
|
|
+ if (k > ITERATIONS) {
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ LOG.info("Timed out waiting for corrupt replicas, trying again, iteration " + k);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
|
|
|
// Loop until the block recovers after replication
|
|
|
DFSTestUtil.waitReplication(fs, file1, numReplicas);
|