|
@@ -272,9 +272,8 @@ public class TestBlockReport {
|
|
|
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
|
|
final int DN_N1 = DN_N0 + 1;
|
|
|
|
|
|
- ArrayList<Block> blocks =
|
|
|
- writeFileAndStartDN(METHOD_NAME,
|
|
|
- FILE_SIZE, filePath, true);
|
|
|
+ ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
|
|
|
+ startDNandWait(filePath, true);
|
|
|
|
|
|
cluster.getNameNode().blockReport(
|
|
|
cluster.getDataNodes().get(DN_N1).dnRegistration,
|
|
@@ -306,8 +305,8 @@ public class TestBlockReport {
|
|
|
final int DN_N1 = DN_N0 + 1;
|
|
|
|
|
|
// write file and start second node to be "older" than the original
|
|
|
- ArrayList<Block> blocks = writeFileAndStartDN(METHOD_NAME,
|
|
|
- FILE_SIZE, filePath, true);
|
|
|
+ ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
|
|
|
+ startDNandWait(filePath, true);
|
|
|
|
|
|
int randIndex = rand.nextInt(blocks.size());
|
|
|
// Get a block and screw its GS
|
|
@@ -378,12 +377,14 @@ public class TestBlockReport {
|
|
|
shutDownCluster();
|
|
|
startUpCluster();
|
|
|
|
|
|
- // write file and start second node to be "older" than the original
|
|
|
try {
|
|
|
ArrayList<Block> blocks =
|
|
|
- writeFileAndStartDN(METHOD_NAME, 6 * bytesChkSum, filePath, false);
|
|
|
+ writeFile(METHOD_NAME, 6 * bytesChkSum, filePath);
|
|
|
+ Block bl = findBlock(filePath, 6 * bytesChkSum);
|
|
|
+ BlockChecker bc = new BlockChecker(filePath);
|
|
|
+ bc.start();
|
|
|
|
|
|
- prepareSecondReplica(filePath, DN_N1);
|
|
|
+ waitForTempReplica(bl, DN_N1);
|
|
|
|
|
|
cluster.getNameNode().blockReport(
|
|
|
cluster.getDataNodes().get(DN_N1).dnRegistration,
|
|
@@ -391,6 +392,10 @@ public class TestBlockReport {
|
|
|
printStats();
|
|
|
assertEquals("Wrong number of PendingReplication blocks",
|
|
|
blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks());
|
|
|
+
|
|
|
+ try {
|
|
|
+ bc.join();
|
|
|
+ } catch (InterruptedException e) { }
|
|
|
} finally {
|
|
|
resetConfiguration(); // return the initial state of the configuration
|
|
|
}
|
|
@@ -414,28 +419,78 @@ public class TestBlockReport {
|
|
|
|
|
|
try {
|
|
|
ArrayList<Block> blocks =
|
|
|
- writeFileAndStartDN(METHOD_NAME, 6 * bytesChkSum, filePath, false);
|
|
|
- Block b = prepareSecondReplica(filePath, DN_N1);
|
|
|
- corruptBlockGS(b);
|
|
|
- corruptBlockLen(b);
|
|
|
+ writeFile(METHOD_NAME, 6 * bytesChkSum, filePath);
|
|
|
+
|
|
|
+ Block bl = findBlock(filePath, 6 * bytesChkSum);
|
|
|
+ BlockChecker bc = new BlockChecker(filePath);
|
|
|
+ bc.start();
|
|
|
+ corruptBlockGS(bl);
|
|
|
+ corruptBlockLen(bl);
|
|
|
|
|
|
- DatanodeCommand dnC = cluster.getNameNode().blockReport(
|
|
|
+ waitForTempReplica(bl, DN_N1);
|
|
|
+
|
|
|
+ cluster.getNameNode().blockReport(
|
|
|
cluster.getDataNodes().get(DN_N1).dnRegistration,
|
|
|
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
|
|
|
- LOG.debug("Getting command back: " + dnC);
|
|
|
printStats();
|
|
|
assertEquals("Wrong number of PendingReplication blocks",
|
|
|
2, cluster.getNamesystem().getPendingReplicationBlocks());
|
|
|
+
|
|
|
+ try {
|
|
|
+ bc.join();
|
|
|
+ } catch (InterruptedException e) {}
|
|
|
} finally {
|
|
|
resetConfiguration(); // return the initial state of the configuration
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void waitForTempReplica(Block bl, int DN_N1) {
|
|
|
+ final boolean tooLongWait = false;
|
|
|
+ final int TIMEOUT = 40000;
|
|
|
+
|
|
|
+ LOG.debug("Wait for datanode " + DN_N1 + " to appear");
|
|
|
+ while (cluster.getDataNodes().size() <= DN_N1) {
|
|
|
+ waitTil(20);
|
|
|
+ }
|
|
|
+ LOG.debug("Total number of DNs " + cluster.getDataNodes().size());
|
|
|
+ // Look about specified DN for the replica of the block from 1st DN
|
|
|
+ Replica r;
|
|
|
+ r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
|
|
|
+ fetchReplicaInfo(bl.getBlockId());
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
+ int count = 0;
|
|
|
+ while (r == null) {
|
|
|
+ waitTil(50);
|
|
|
+ r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
|
|
|
+ fetchReplicaInfo(bl.getBlockId());
|
|
|
+ long waiting_period = System.currentTimeMillis() - start;
|
|
|
+ if (count++ % 10 == 0)
|
|
|
+ LOG.debug("Has been waiting for " + waiting_period + " ms.");
|
|
|
+ if (waiting_period > TIMEOUT)
|
|
|
+ assertTrue("Was waiting too long to get ReplicaInfo from a datanode",
|
|
|
+ tooLongWait);
|
|
|
+ }
|
|
|
+
|
|
|
+ HdfsConstants.ReplicaState state = r.getState();
|
|
|
+ LOG.debug("Replica state before the loop " + state.getValue());
|
|
|
+ start = System.currentTimeMillis();
|
|
|
+ while (state != HdfsConstants.ReplicaState.TEMPORARY) {
|
|
|
+ waitTil(100);
|
|
|
+ state = r.getState();
|
|
|
+ LOG.debug("Keep waiting for " + bl.getBlockName() +
|
|
|
+ " is in state " + state.getValue());
|
|
|
+ if (System.currentTimeMillis() - start > TIMEOUT)
|
|
|
+ assertTrue("Was waiting too long for a replica to become TEMPORARY",
|
|
|
+ tooLongWait);
|
|
|
+ }
|
|
|
+ LOG.debug("Replica state after the loop " + state.getValue());
|
|
|
+ }
|
|
|
+
|
|
|
// Helper methods from here below...
|
|
|
- private ArrayList<Block> writeFileAndStartDN(final String METHOD_NAME,
|
|
|
+ // Write file and start second data node.
|
|
|
+ private ArrayList<Block> writeFile(final String METHOD_NAME,
|
|
|
final long fileSize,
|
|
|
- Path filePath,
|
|
|
- boolean waitReplicas)
|
|
|
+ Path filePath)
|
|
|
throws IOException {
|
|
|
ArrayList<Block> blocks = null;
|
|
|
try {
|
|
@@ -444,6 +499,12 @@ public class TestBlockReport {
|
|
|
} catch (IOException e) {
|
|
|
LOG.debug("Caught exception ", e);
|
|
|
}
|
|
|
+ return blocks;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void startDNandWait(Path filePath, boolean waitReplicas)
|
|
|
+ throws IOException {
|
|
|
+ LOG.debug("Before next DN start: " + cluster.getDataNodes().size());
|
|
|
cluster.startDataNodes(conf, 1, true, null, null);
|
|
|
ArrayList<DataNode> datanodes = cluster.getDataNodes();
|
|
|
assertEquals(datanodes.size(), 2);
|
|
@@ -452,7 +513,6 @@ public class TestBlockReport {
|
|
|
+ cluster.getDataNodes().get(datanodes.size() - 1)
|
|
|
.getDatanodeRegistration() + " has been started");
|
|
|
if (waitReplicas) DFSTestUtil.waitReplication(fs, filePath, REPL_FACTOR);
|
|
|
- return blocks;
|
|
|
}
|
|
|
|
|
|
private ArrayList<Block> prepareForRide(final Path filePath,
|
|
@@ -543,6 +603,9 @@ public class TestBlockReport {
|
|
|
|
|
|
private void corruptBlockLen(final Block block)
|
|
|
throws IOException {
|
|
|
+ if (block == null) {
|
|
|
+ throw new IOException("Block isn't suppose to be null");
|
|
|
+ }
|
|
|
long oldLen = block.getNumBytes();
|
|
|
long newLen = oldLen - rand.nextLong();
|
|
|
assertTrue("Old and new length shouldn't be the same",
|
|
@@ -554,6 +617,9 @@ public class TestBlockReport {
|
|
|
|
|
|
private void corruptBlockGS(final Block block)
|
|
|
throws IOException {
|
|
|
+ if (block == null) {
|
|
|
+ throw new IOException("Block isn't suppose to be null");
|
|
|
+ }
|
|
|
long oldGS = block.getGenerationStamp();
|
|
|
long newGS = oldGS - rand.nextLong();
|
|
|
assertTrue("Old and new GS shouldn't be the same",
|
|
@@ -563,49 +629,33 @@ public class TestBlockReport {
|
|
|
" is changed to " + block.getGenerationStamp() + " from " + oldGS);
|
|
|
}
|
|
|
|
|
|
- // The method simply start second node and wait until a TEMPORARY replica
|
|
|
- // appears on it.
|
|
|
- // Returns the block from the specified <code>nodeNum</code> datanode
|
|
|
- private Block prepareSecondReplica(Path filePath,
|
|
|
- int nodeNum) throws IOException {
|
|
|
+ private Block findBlock(Path path, long size) throws IOException {
|
|
|
+ Block ret;
|
|
|
+ List<LocatedBlock> lbs =
|
|
|
+ cluster.getNameNode().getBlockLocations(path.toString(),
|
|
|
+ FILE_START, size).getLocatedBlocks();
|
|
|
+ LocatedBlock lb = lbs.get(lbs.size() - 1);
|
|
|
|
|
|
- final boolean tooLongWait = false;
|
|
|
- final int TIMEOUT = 4000;
|
|
|
-
|
|
|
- List<LocatedBlock> lbs =
|
|
|
- cluster.getNameNode().getBlockLocations(filePath.toString(),
|
|
|
- FILE_START, FILE_SIZE).getLocatedBlocks();
|
|
|
- LocatedBlock lb = lbs.get(lbs.size() - 1);
|
|
|
-
|
|
|
- Block ret = cluster.getDataNodes().get(DN_N0).
|
|
|
- data.getStoredBlock(lb.getBlock().getBlockId());
|
|
|
- Replica r =
|
|
|
- ((FSDataset) cluster.getDataNodes().get(nodeNum).getFSDataset()).
|
|
|
- fetchReplicaInfo(ret.getBlockId());
|
|
|
- long start = System.currentTimeMillis();
|
|
|
- while (r == null) {
|
|
|
- waitTil(50);
|
|
|
- r = ((FSDataset) cluster.getDataNodes().get(nodeNum).getFSDataset()).
|
|
|
- fetchReplicaInfo(ret.getBlockId());
|
|
|
- if (System.currentTimeMillis() - start > TIMEOUT)
|
|
|
- assertTrue("Was waiting too long to get ReplicaInfo from a datanode",
|
|
|
- tooLongWait);
|
|
|
- }
|
|
|
+ // Get block from the first DN
|
|
|
+ ret = cluster.getDataNodes().get(DN_N0).
|
|
|
+ data.getStoredBlock(lb.getBlock().getBlockId());
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
|
|
|
- HdfsConstants.ReplicaState state = r.getState();
|
|
|
- LOG.debug("Replica state before the loop " + state.getValue());
|
|
|
- start = System.currentTimeMillis();
|
|
|
- while (state != HdfsConstants.ReplicaState.TEMPORARY) {
|
|
|
- waitTil(100);
|
|
|
- state = r.getState();
|
|
|
- LOG.debug("Keep waiting for " + ret.getBlockName() +
|
|
|
- " is in state " + state.getValue());
|
|
|
- if (System.currentTimeMillis() - start > TIMEOUT)
|
|
|
- assertTrue("Was waiting too long for a replica to become TEMPORARY",
|
|
|
- tooLongWait);
|
|
|
+ private class BlockChecker extends Thread {
|
|
|
+ Path filePath;
|
|
|
+
|
|
|
+ public BlockChecker(final Path filePath) {
|
|
|
+ this.filePath = filePath;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ startDNandWait(filePath, true);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Shouldn't happen", e);
|
|
|
+ }
|
|
|
}
|
|
|
- LOG.debug("Replica state after the loop " + state.getValue());
|
|
|
- return ret;
|
|
|
}
|
|
|
|
|
|
private static void resetConfiguration() {
|