|
@@ -452,8 +452,8 @@ public class TestBlockManager {
|
|
String src = "/test-file";
|
|
String src = "/test-file";
|
|
Path file = new Path(src);
|
|
Path file = new Path(src);
|
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
|
- cluster.waitActive();
|
|
|
|
try {
|
|
try {
|
|
|
|
+ cluster.waitActive();
|
|
BlockManager bm = cluster.getNamesystem().getBlockManager();
|
|
BlockManager bm = cluster.getNamesystem().getBlockManager();
|
|
FileSystem fs = cluster.getFileSystem();
|
|
FileSystem fs = cluster.getFileSystem();
|
|
NamenodeProtocols namenode = cluster.getNameNodeRpc();
|
|
NamenodeProtocols namenode = cluster.getNameNodeRpc();
|
|
@@ -492,7 +492,9 @@ public class TestBlockManager {
|
|
IOUtils.closeStream(out);
|
|
IOUtils.closeStream(out);
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
- cluster.shutdown();
|
|
|
|
|
|
+ if (cluster != null) {
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1043,7 +1045,9 @@ public class TestBlockManager {
|
|
assertTrue(fs.exists(file1));
|
|
assertTrue(fs.exists(file1));
|
|
fs.delete(file1, true);
|
|
fs.delete(file1, true);
|
|
assertTrue(!fs.exists(file1));
|
|
assertTrue(!fs.exists(file1));
|
|
- cluster.shutdown();
|
|
|
|
|
|
+ if (cluster != null) {
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1143,7 +1147,9 @@ public class TestBlockManager {
|
|
assertEquals(0, bm.getBlockOpQueueLength());
|
|
assertEquals(0, bm.getBlockOpQueueLength());
|
|
assertTrue(doneLatch.await(1, TimeUnit.SECONDS));
|
|
assertTrue(doneLatch.await(1, TimeUnit.SECONDS));
|
|
} finally {
|
|
} finally {
|
|
- cluster.shutdown();
|
|
|
|
|
|
+ if (cluster != null) {
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1218,7 +1224,9 @@ public class TestBlockManager {
|
|
long batched = MetricsAsserts.getLongCounter("BlockOpsBatched", rb);
|
|
long batched = MetricsAsserts.getLongCounter("BlockOpsBatched", rb);
|
|
assertTrue(batched > 0);
|
|
assertTrue(batched > 0);
|
|
} finally {
|
|
} finally {
|
|
- cluster.shutdown();
|
|
|
|
|
|
+ if (cluster != null) {
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1227,76 +1235,83 @@ public class TestBlockManager {
|
|
final Configuration conf = new HdfsConfiguration();
|
|
final Configuration conf = new HdfsConfiguration();
|
|
final MiniDFSCluster cluster =
|
|
final MiniDFSCluster cluster =
|
|
new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
|
|
new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
|
|
- cluster.waitActive();
|
|
|
|
- BlockManager blockManager = cluster.getNamesystem().getBlockManager();
|
|
|
|
- FileSystem fs = cluster.getFileSystem();
|
|
|
|
- final Path filePath = new Path("/tmp.txt");
|
|
|
|
- final long fileLen = 1L;
|
|
|
|
- DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, 1L);
|
|
|
|
- DFSTestUtil.waitForReplication((DistributedFileSystem)fs,
|
|
|
|
- filePath, (short) 3, 60000);
|
|
|
|
- ArrayList<DataNode> datanodes = cluster.getDataNodes();
|
|
|
|
- assertEquals(datanodes.size(), 4);
|
|
|
|
- FSNamesystem ns = cluster.getNamesystem();
|
|
|
|
- // get the block
|
|
|
|
- final String bpid = cluster.getNamesystem().getBlockPoolId();
|
|
|
|
- File storageDir = cluster.getInstanceStorageDir(0, 0);
|
|
|
|
- File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
|
|
|
- assertTrue("Data directory does not exist", dataDir.exists());
|
|
|
|
- BlockInfo blockInfo = blockManager.blocksMap.getBlocks().iterator().next();
|
|
|
|
- ExtendedBlock blk = new ExtendedBlock(bpid, blockInfo.getBlockId(),
|
|
|
|
- blockInfo.getNumBytes(), blockInfo.getGenerationStamp());
|
|
|
|
- DatanodeDescriptor failedStorageDataNode =
|
|
|
|
- blockManager.getStoredBlock(blockInfo).getDatanode(0);
|
|
|
|
- DatanodeDescriptor corruptStorageDataNode =
|
|
|
|
- blockManager.getStoredBlock(blockInfo).getDatanode(1);
|
|
|
|
-
|
|
|
|
- ArrayList<StorageReport> reports = new ArrayList<StorageReport>();
|
|
|
|
- for(int i=0; i<failedStorageDataNode.getStorageInfos().length; i++) {
|
|
|
|
- DatanodeStorageInfo storageInfo = failedStorageDataNode
|
|
|
|
- .getStorageInfos()[i];
|
|
|
|
- DatanodeStorage dns = new DatanodeStorage(
|
|
|
|
- failedStorageDataNode.getStorageInfos()[i].getStorageID(),
|
|
|
|
- DatanodeStorage.State.FAILED,
|
|
|
|
- failedStorageDataNode.getStorageInfos()[i].getStorageType());
|
|
|
|
- while(storageInfo.getBlockIterator().hasNext()) {
|
|
|
|
- BlockInfo blockInfo1 = storageInfo.getBlockIterator().next();
|
|
|
|
- if(blockInfo1.equals(blockInfo)) {
|
|
|
|
- StorageReport report = new StorageReport(
|
|
|
|
- dns, true, storageInfo.getCapacity(),
|
|
|
|
- storageInfo.getDfsUsed(), storageInfo.getRemaining(),
|
|
|
|
- storageInfo.getBlockPoolUsed(), 0L);
|
|
|
|
- reports.add(report);
|
|
|
|
- break;
|
|
|
|
|
|
+ try {
|
|
|
|
+ cluster.waitActive();
|
|
|
|
+ BlockManager blockManager = cluster.getNamesystem().getBlockManager();
|
|
|
|
+ FileSystem fs = cluster.getFileSystem();
|
|
|
|
+ final Path filePath = new Path("/tmp.txt");
|
|
|
|
+ final long fileLen = 1L;
|
|
|
|
+ DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, 1L);
|
|
|
|
+ DFSTestUtil.waitForReplication((DistributedFileSystem)fs,
|
|
|
|
+ filePath, (short) 3, 60000);
|
|
|
|
+ ArrayList<DataNode> datanodes = cluster.getDataNodes();
|
|
|
|
+ assertEquals(datanodes.size(), 4);
|
|
|
|
+ FSNamesystem ns = cluster.getNamesystem();
|
|
|
|
+ // get the block
|
|
|
|
+ final String bpid = cluster.getNamesystem().getBlockPoolId();
|
|
|
|
+ File storageDir = cluster.getInstanceStorageDir(0, 0);
|
|
|
|
+ File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
|
|
|
+ assertTrue("Data directory does not exist", dataDir.exists());
|
|
|
|
+ BlockInfo blockInfo =
|
|
|
|
+ blockManager.blocksMap.getBlocks().iterator().next();
|
|
|
|
+ ExtendedBlock blk = new ExtendedBlock(bpid, blockInfo.getBlockId(),
|
|
|
|
+ blockInfo.getNumBytes(), blockInfo.getGenerationStamp());
|
|
|
|
+ DatanodeDescriptor failedStorageDataNode =
|
|
|
|
+ blockManager.getStoredBlock(blockInfo).getDatanode(0);
|
|
|
|
+ DatanodeDescriptor corruptStorageDataNode =
|
|
|
|
+ blockManager.getStoredBlock(blockInfo).getDatanode(1);
|
|
|
|
+
|
|
|
|
+ ArrayList<StorageReport> reports = new ArrayList<StorageReport>();
|
|
|
|
+ for(int i=0; i<failedStorageDataNode.getStorageInfos().length; i++) {
|
|
|
|
+ DatanodeStorageInfo storageInfo = failedStorageDataNode
|
|
|
|
+ .getStorageInfos()[i];
|
|
|
|
+ DatanodeStorage dns = new DatanodeStorage(
|
|
|
|
+ failedStorageDataNode.getStorageInfos()[i].getStorageID(),
|
|
|
|
+ DatanodeStorage.State.FAILED,
|
|
|
|
+ failedStorageDataNode.getStorageInfos()[i].getStorageType());
|
|
|
|
+ while(storageInfo.getBlockIterator().hasNext()) {
|
|
|
|
+ BlockInfo blockInfo1 = storageInfo.getBlockIterator().next();
|
|
|
|
+ if(blockInfo1.equals(blockInfo)) {
|
|
|
|
+ StorageReport report = new StorageReport(
|
|
|
|
+ dns, true, storageInfo.getCapacity(),
|
|
|
|
+ storageInfo.getDfsUsed(), storageInfo.getRemaining(),
|
|
|
|
+ storageInfo.getBlockPoolUsed(), 0L);
|
|
|
|
+ reports.add(report);
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- }
|
|
|
|
- failedStorageDataNode.updateHeartbeat(reports.toArray(StorageReport
|
|
|
|
- .EMPTY_ARRAY), 0L, 0L, 0, 0, null);
|
|
|
|
- ns.writeLock();
|
|
|
|
- DatanodeStorageInfo corruptStorageInfo= null;
|
|
|
|
- for(int i=0; i<corruptStorageDataNode.getStorageInfos().length; i++) {
|
|
|
|
- corruptStorageInfo = corruptStorageDataNode.getStorageInfos()[i];
|
|
|
|
- while(corruptStorageInfo.getBlockIterator().hasNext()) {
|
|
|
|
- BlockInfo blockInfo1 = corruptStorageInfo.getBlockIterator().next();
|
|
|
|
- if (blockInfo1.equals(blockInfo)) {
|
|
|
|
- break;
|
|
|
|
|
|
+ failedStorageDataNode.updateHeartbeat(reports.toArray(StorageReport
|
|
|
|
+ .EMPTY_ARRAY), 0L, 0L, 0, 0, null);
|
|
|
|
+ ns.writeLock();
|
|
|
|
+ DatanodeStorageInfo corruptStorageInfo= null;
|
|
|
|
+ for(int i=0; i<corruptStorageDataNode.getStorageInfos().length; i++) {
|
|
|
|
+ corruptStorageInfo = corruptStorageDataNode.getStorageInfos()[i];
|
|
|
|
+ while(corruptStorageInfo.getBlockIterator().hasNext()) {
|
|
|
|
+ BlockInfo blockInfo1 = corruptStorageInfo.getBlockIterator().next();
|
|
|
|
+ if (blockInfo1.equals(blockInfo)) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ blockManager.findAndMarkBlockAsCorrupt(blk, corruptStorageDataNode,
|
|
|
|
+ corruptStorageInfo.getStorageID(),
|
|
|
|
+ CorruptReplicasMap.Reason.ANY.toString());
|
|
|
|
+ ns.writeUnlock();
|
|
|
|
+ BlockInfo[] blockInfos = new BlockInfo[] {blockInfo};
|
|
|
|
+ ns.readLock();
|
|
|
|
+ LocatedBlocks locatedBlocks =
|
|
|
|
+ blockManager.createLocatedBlocks(blockInfos, 3L, false, 0L, 3L,
|
|
|
|
+ false, false, null, null);
|
|
|
|
+ assertTrue("Located Blocks should exclude corrupt" +
|
|
|
|
+ "replicas and failed storages",
|
|
|
|
+ locatedBlocks.getLocatedBlocks().size() == 1);
|
|
|
|
+ ns.readUnlock();
|
|
|
|
+ } finally {
|
|
|
|
+ if (cluster != null) {
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- blockManager.findAndMarkBlockAsCorrupt(blk, corruptStorageDataNode,
|
|
|
|
- corruptStorageInfo.getStorageID(),
|
|
|
|
- CorruptReplicasMap.Reason.ANY.toString());
|
|
|
|
- ns.writeUnlock();
|
|
|
|
- BlockInfo[] blockInfos = new BlockInfo[] {blockInfo};
|
|
|
|
- ns.readLock();
|
|
|
|
- LocatedBlocks locatedBlocks =
|
|
|
|
- blockManager.createLocatedBlocks(blockInfos, 3L, false, 0L, 3L,
|
|
|
|
- false, false, null, null);
|
|
|
|
- assertTrue("Located Blocks should exclude corrupt" +
|
|
|
|
- "replicas and failed storages",
|
|
|
|
- locatedBlocks.getLocatedBlocks().size() == 1);
|
|
|
|
- ns.readUnlock();
|
|
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|