|
@@ -83,11 +83,15 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
|
|
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
|
|
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
|
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
|
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.CacheManager;
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
|
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
|
import org.apache.hadoop.hdfs.server.namenode.INodeId;
|
|
import org.apache.hadoop.hdfs.server.namenode.INodeId;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
import org.apache.hadoop.hdfs.server.namenode.TestINodeFile;
|
|
import org.apache.hadoop.hdfs.server.namenode.TestINodeFile;
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
|
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|
@@ -102,6 +106,8 @@ import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.apache.hadoop.test.MetricsAsserts;
|
|
import org.apache.hadoop.test.MetricsAsserts;
|
|
|
|
+import org.apache.hadoop.util.GSet;
|
|
|
|
+import org.apache.hadoop.util.LightWeightGSet;
|
|
import org.apache.log4j.Level;
|
|
import org.apache.log4j.Level;
|
|
import org.apache.log4j.Logger;
|
|
import org.apache.log4j.Logger;
|
|
import org.junit.Assert;
|
|
import org.junit.Assert;
|
|
@@ -146,7 +152,20 @@ public class TestBlockManager {
|
|
Mockito.doReturn(true).when(fsn).hasWriteLock();
|
|
Mockito.doReturn(true).when(fsn).hasWriteLock();
|
|
Mockito.doReturn(true).when(fsn).hasReadLock();
|
|
Mockito.doReturn(true).when(fsn).hasReadLock();
|
|
Mockito.doReturn(true).when(fsn).isRunning();
|
|
Mockito.doReturn(true).when(fsn).isRunning();
|
|
|
|
+ //Make shouldPopulaeReplQueues return true
|
|
|
|
+ HAContext haContext = Mockito.mock(HAContext.class);
|
|
|
|
+ HAState haState = Mockito.mock(HAState.class);
|
|
|
|
+ Mockito.when(haContext.getState()).thenReturn(haState);
|
|
|
|
+ Mockito.when(haState.shouldPopulateReplQueues()).thenReturn(true);
|
|
|
|
+ Mockito.when(fsn.getHAContext()).thenReturn(haContext);
|
|
bm = new BlockManager(fsn, false, conf);
|
|
bm = new BlockManager(fsn, false, conf);
|
|
|
|
+ bm.setInitializedReplQueues(true);
|
|
|
|
+ CacheManager cm = Mockito.mock(CacheManager.class);
|
|
|
|
+ Mockito.doReturn(cm).when(fsn).getCacheManager();
|
|
|
|
+ GSet<CachedBlock, CachedBlock> cb =
|
|
|
|
+ new LightWeightGSet<CachedBlock, CachedBlock>(1);
|
|
|
|
+ Mockito.when(cm.getCachedBlocks()).thenReturn(cb);
|
|
|
|
+
|
|
final String[] racks = {
|
|
final String[] racks = {
|
|
"/rackA",
|
|
"/rackA",
|
|
"/rackA",
|
|
"/rackA",
|
|
@@ -522,7 +541,7 @@ public class TestBlockManager {
|
|
}
|
|
}
|
|
return ret;
|
|
return ret;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
private List<DatanodeDescriptor> startDecommission(int ... indexes) {
|
|
private List<DatanodeDescriptor> startDecommission(int ... indexes) {
|
|
List<DatanodeDescriptor> nodes = getNodes(indexes);
|
|
List<DatanodeDescriptor> nodes = getNodes(indexes);
|
|
for (DatanodeDescriptor node : nodes) {
|
|
for (DatanodeDescriptor node : nodes) {
|
|
@@ -918,6 +937,42 @@ public class TestBlockManager {
|
|
return builder.build();
|
|
return builder.build();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testUCBlockNotConsideredMissing() throws Exception {
|
|
|
|
+ DatanodeDescriptor node = nodes.get(0);
|
|
|
|
+ DatanodeStorageInfo ds = node.getStorageInfos()[0];
|
|
|
|
+ node.setAlive(true);
|
|
|
|
+ DatanodeRegistration nodeReg =
|
|
|
|
+ new DatanodeRegistration(node, null, null, "");
|
|
|
|
+
|
|
|
|
+ // register new node
|
|
|
|
+ bm.getDatanodeManager().registerDatanode(nodeReg);
|
|
|
|
+ bm.getDatanodeManager().addDatanode(node);
|
|
|
|
+
|
|
|
|
+ // Build an incremental report
|
|
|
|
+ List<ReceivedDeletedBlockInfo> rdbiList = new ArrayList<>();
|
|
|
|
+
|
|
|
|
+ // blk_42 is under construction, finalizes on one node and is
|
|
|
|
+ // immediately deleted on same node
|
|
|
|
+ long blockId = 42; // arbitrary
|
|
|
|
+ BlockInfo receivedBlock = addUcBlockToBM(blockId);
|
|
|
|
+
|
|
|
|
+ rdbiList.add(new ReceivedDeletedBlockInfo(new Block(receivedBlock),
|
|
|
|
+ ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null));
|
|
|
|
+ rdbiList.add(new ReceivedDeletedBlockInfo(
|
|
|
|
+ new Block(blockId),
|
|
|
|
+ ReceivedDeletedBlockInfo.BlockStatus.DELETED_BLOCK, null));
|
|
|
|
+
|
|
|
|
+ // process IBR
|
|
|
|
+ StorageReceivedDeletedBlocks srdb =
|
|
|
|
+ new StorageReceivedDeletedBlocks(new DatanodeStorage(ds.getStorageID()),
|
|
|
|
+ rdbiList.toArray(new ReceivedDeletedBlockInfo[rdbiList.size()]));
|
|
|
|
+ bm.processIncrementalBlockReport(node, srdb);
|
|
|
|
+ // Needed replications should still be 0.
|
|
|
|
+ assertEquals("UC block was incorrectly added to needed Replications",
|
|
|
|
+ 0, bm.neededReconstruction.size());
|
|
|
|
+ }
|
|
|
|
+
|
|
private BlockInfo addBlockToBM(long blkId) {
|
|
private BlockInfo addBlockToBM(long blkId) {
|
|
Block block = new Block(blkId);
|
|
Block block = new Block(blkId);
|
|
BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3);
|
|
BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3);
|
|
@@ -1250,14 +1305,17 @@ public class TestBlockManager {
|
|
FileInputStream fstream = new FileInputStream(file);
|
|
FileInputStream fstream = new FileInputStream(file);
|
|
DataInputStream in = new DataInputStream(fstream);
|
|
DataInputStream in = new DataInputStream(fstream);
|
|
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
|
|
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
|
|
|
|
+ String corruptBlocksLine;
|
|
|
|
+ Boolean foundIt = false;
|
|
try {
|
|
try {
|
|
- for(int i =0;i<6;i++) {
|
|
|
|
- reader.readLine();
|
|
|
|
|
|
+ while ((corruptBlocksLine = reader.readLine()) != null) {
|
|
|
|
+ if (corruptBlocksLine.compareTo("Corrupt Blocks:") == 0) {
|
|
|
|
+ foundIt = true;
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- String corruptBlocksLine = reader.readLine();
|
|
|
|
- assertEquals("Unexpected text in metasave," +
|
|
|
|
- "was expecting corrupt blocks section!", 0,
|
|
|
|
- corruptBlocksLine.compareTo("Corrupt Blocks:"));
|
|
|
|
|
|
+ assertTrue("Unexpected text in metasave," +
|
|
|
|
+ "was expecting corrupt blocks section!", foundIt);
|
|
corruptBlocksLine = reader.readLine();
|
|
corruptBlocksLine = reader.readLine();
|
|
String regex = "Block=[0-9]+\\tNode=.*\\tStorageID=.*StorageState.*" +
|
|
String regex = "Block=[0-9]+\\tNode=.*\\tStorageID=.*StorageState.*" +
|
|
"TotalReplicas=.*Reason=GENSTAMP_MISMATCH";
|
|
"TotalReplicas=.*Reason=GENSTAMP_MISMATCH";
|