|
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Iterator;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map.Entry;
|
|
@@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
@@ -80,17 +82,17 @@ public class TestBlockManager {
|
|
|
Mockito.doReturn(true).when(fsn).hasWriteLock();
|
|
|
bm = new BlockManager(fsn, fsn, conf);
|
|
|
nodes = ImmutableList.of(
|
|
|
- DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/rackA"),
|
|
|
- DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/rackA"),
|
|
|
- DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/rackA"),
|
|
|
- DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/rackB"),
|
|
|
- DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/rackB"),
|
|
|
- DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/rackB")
|
|
|
- );
|
|
|
+ BlockManagerTestUtil.getDatanodeDescriptor("1.1.1.1", "/rackA", true),
|
|
|
+ BlockManagerTestUtil.getDatanodeDescriptor("2.2.2.2", "/rackA", true),
|
|
|
+ BlockManagerTestUtil.getDatanodeDescriptor("3.3.3.3", "/rackA", true),
|
|
|
+ BlockManagerTestUtil.getDatanodeDescriptor("4.4.4.4", "/rackB", true),
|
|
|
+ BlockManagerTestUtil.getDatanodeDescriptor("5.5.5.5", "/rackB", true),
|
|
|
+ BlockManagerTestUtil.getDatanodeDescriptor("6.6.6.6", "/rackB", true)
|
|
|
+ );
|
|
|
rackA = nodes.subList(0, 3);
|
|
|
rackB = nodes.subList(3, 6);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private void addNodes(Iterable<DatanodeDescriptor> nodesToAdd) {
|
|
|
NetworkTopology cluster = bm.getDatanodeManager().getNetworkTopology();
|
|
|
// construct network topology
|
|
@@ -282,6 +284,7 @@ public class TestBlockManager {
|
|
|
// the third off-rack replica.
|
|
|
DatanodeDescriptor rackCNode =
|
|
|
DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/rackC");
|
|
|
+ rackCNode.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
|
|
|
addNodes(ImmutableList.of(rackCNode));
|
|
|
try {
|
|
|
DatanodeDescriptor[] pipeline2 = scheduleSingleReplication(blockInfo);
|
|
@@ -322,15 +325,15 @@ public class TestBlockManager {
|
|
|
@Test
|
|
|
public void testBlocksAreNotUnderreplicatedInSingleRack() throws Exception {
|
|
|
List<DatanodeDescriptor> nodes = ImmutableList.of(
|
|
|
- DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/rackA"),
|
|
|
- DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/rackA"),
|
|
|
- DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/rackA"),
|
|
|
- DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/rackA"),
|
|
|
- DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/rackA"),
|
|
|
- DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/rackA")
|
|
|
+ BlockManagerTestUtil.getDatanodeDescriptor("1.1.1.1", "/rackA", true),
|
|
|
+ BlockManagerTestUtil.getDatanodeDescriptor("2.2.2.2", "/rackA", true),
|
|
|
+ BlockManagerTestUtil.getDatanodeDescriptor("3.3.3.3", "/rackA", true),
|
|
|
+ BlockManagerTestUtil.getDatanodeDescriptor("4.4.4.4", "/rackA", true),
|
|
|
+ BlockManagerTestUtil.getDatanodeDescriptor("5.5.5.5", "/rackA", true),
|
|
|
+ BlockManagerTestUtil.getDatanodeDescriptor("6.6.6.6", "/rackA", true)
|
|
|
);
|
|
|
addNodes(nodes);
|
|
|
- List<DatanodeDescriptor> origNodes = nodes.subList(0, 3);;
|
|
|
+ List<DatanodeDescriptor> origNodes = nodes.subList(0, 3);
|
|
|
for (int i = 0; i < NUM_TEST_ITERS; i++) {
|
|
|
doTestSingleRackClusterIsSufficientlyReplicated(i, origNodes);
|
|
|
}
|
|
@@ -353,7 +356,17 @@ public class TestBlockManager {
|
|
|
private void fulfillPipeline(BlockInfo blockInfo,
|
|
|
DatanodeDescriptor[] pipeline) throws IOException {
|
|
|
for (int i = 1; i < pipeline.length; i++) {
|
|
|
- bm.addBlock(pipeline[i], "STORAGE_ID", blockInfo, null);
|
|
|
+ DatanodeDescriptor dn = pipeline[i];
|
|
|
+
|
|
|
+ Iterator<DatanodeStorageInfo> iterator = dn.getStorageInfos().iterator();
|
|
|
+ if (iterator.hasNext()) {
|
|
|
+ DatanodeStorageInfo storage = iterator.next();
|
|
|
+ bm.addBlock(dn, storage.getStorageID(), blockInfo, null);
|
|
|
+ blockInfo.addStorage(storage);
|
|
|
+ } else {
|
|
|
+ throw new RuntimeException("Storage info on node: " + dn.getHostName()
|
|
|
+ + " is invalid.");
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -494,7 +507,10 @@ public class TestBlockManager {
|
|
|
@Test
|
|
|
public void testSafeModeIBR() throws Exception {
|
|
|
DatanodeDescriptor node = spy(nodes.get(0));
|
|
|
- node.setStorageID("dummy-storage");
|
|
|
+ Iterator<DatanodeStorageInfo> i = node.getStorageInfos().iterator();
|
|
|
+ DatanodeStorageInfo ds = i.next();
|
|
|
+ node.setStorageID(ds.getStorageID());
|
|
|
+
|
|
|
node.isAlive = true;
|
|
|
|
|
|
DatanodeRegistration nodeReg =
|
|
@@ -510,12 +526,15 @@ public class TestBlockManager {
|
|
|
assertTrue(node.isFirstBlockReport());
|
|
|
// send block report, should be processed
|
|
|
reset(node);
|
|
|
- bm.processReport(node, null, "pool", new BlockListAsLongs(null, null));
|
|
|
+
|
|
|
+ bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
|
|
|
+ new BlockListAsLongs(null, null));
|
|
|
verify(node).receivedBlockReport();
|
|
|
assertFalse(node.isFirstBlockReport());
|
|
|
// send block report again, should NOT be processed
|
|
|
reset(node);
|
|
|
- bm.processReport(node, null, "pool", new BlockListAsLongs(null, null));
|
|
|
+ bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
|
|
|
+ new BlockListAsLongs(null, null));
|
|
|
verify(node, never()).receivedBlockReport();
|
|
|
assertFalse(node.isFirstBlockReport());
|
|
|
|
|
@@ -527,7 +546,8 @@ public class TestBlockManager {
|
|
|
assertTrue(node.isFirstBlockReport()); // ready for report again
|
|
|
// send block report, should be processed after restart
|
|
|
reset(node);
|
|
|
- bm.processReport(node, null, "pool", new BlockListAsLongs(null, null));
|
|
|
+ bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
|
|
|
+ new BlockListAsLongs(null, null));
|
|
|
verify(node).receivedBlockReport();
|
|
|
assertFalse(node.isFirstBlockReport());
|
|
|
}
|
|
@@ -535,7 +555,9 @@ public class TestBlockManager {
|
|
|
@Test
|
|
|
public void testSafeModeIBRAfterIncremental() throws Exception {
|
|
|
DatanodeDescriptor node = spy(nodes.get(0));
|
|
|
- node.setStorageID("dummy-storage");
|
|
|
+ Iterator<DatanodeStorageInfo> i = node.getStorageInfos().iterator();
|
|
|
+ DatanodeStorageInfo ds = i.next();
|
|
|
+ node.setStorageID(ds.getStorageID());
|
|
|
node.isAlive = true;
|
|
|
|
|
|
DatanodeRegistration nodeReg =
|
|
@@ -552,7 +574,8 @@ public class TestBlockManager {
|
|
|
// send block report while pretending to already have blocks
|
|
|
reset(node);
|
|
|
doReturn(1).when(node).numBlocks();
|
|
|
- bm.processReport(node, null, "pool", new BlockListAsLongs(null, null));
|
|
|
+ bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
|
|
|
+ new BlockListAsLongs(null, null));
|
|
|
verify(node).receivedBlockReport();
|
|
|
assertFalse(node.isFirstBlockReport());
|
|
|
}
|