|
@@ -27,6 +27,7 @@ import java.net.InetSocketAddress;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.Channels;
|
|
|
import java.nio.channels.ReadableByteChannel;
|
|
|
+import java.util.Iterator;
|
|
|
import java.util.Random;
|
|
|
import org.apache.hadoop.fs.BlockLocation;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -44,13 +45,23 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.ProvidedStorageMap;
|
|
|
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
|
|
|
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
|
|
|
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
|
+import org.apache.hadoop.net.NodeBase;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Rule;
|
|
@@ -59,6 +70,7 @@ import org.junit.rules.TestName;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import static org.apache.hadoop.net.NodeBase.PATH_SEPARATOR_STR;
|
|
|
import static org.junit.Assert.*;
|
|
|
|
|
|
public class TestNameNodeProvidedImplementation {
|
|
@@ -79,6 +91,7 @@ public class TestNameNodeProvidedImplementation {
|
|
|
private final String filePrefix = "file";
|
|
|
private final String fileSuffix = ".dat";
|
|
|
private final int baseFileLen = 1024;
|
|
|
+ private long providedDataSize = 0;
|
|
|
|
|
|
Configuration conf;
|
|
|
MiniDFSCluster cluster;
|
|
@@ -135,6 +148,7 @@ public class TestNameNodeProvidedImplementation {
|
|
|
}
|
|
|
writer.flush();
|
|
|
writer.close();
|
|
|
+ providedDataSize += newFile.length();
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
@@ -206,13 +220,14 @@ public class TestNameNodeProvidedImplementation {
|
|
|
cluster.waitActive();
|
|
|
}
|
|
|
|
|
|
- @Test(timeout = 20000)
|
|
|
+ @Test(timeout=20000)
|
|
|
public void testLoadImage() throws Exception {
|
|
|
final long seed = r.nextLong();
|
|
|
LOG.info("NAMEPATH: " + NAMEPATH);
|
|
|
createImage(new RandomTreeWalk(seed), NNDIRPATH, FixedBlockResolver.class);
|
|
|
- startCluster(NNDIRPATH, 0, new StorageType[] {StorageType.PROVIDED},
|
|
|
- null, false);
|
|
|
+ startCluster(NNDIRPATH, 0,
|
|
|
+ new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, null,
|
|
|
+ false);
|
|
|
|
|
|
FileSystem fs = cluster.getFileSystem();
|
|
|
for (TreePath e : new RandomTreeWalk(seed)) {
|
|
@@ -231,14 +246,83 @@ public class TestNameNodeProvidedImplementation {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=20000)
|
|
|
- public void testBlockLoad() throws Exception {
|
|
|
+ @Test(timeout=30000)
|
|
|
+ public void testProvidedReporting() throws Exception {
|
|
|
conf.setClass(ImageWriter.Options.UGI_CLASS,
|
|
|
SingleUGIResolver.class, UGIResolver.class);
|
|
|
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
|
|
FixedBlockResolver.class);
|
|
|
- startCluster(NNDIRPATH, 1, new StorageType[] {StorageType.PROVIDED},
|
|
|
- null, false);
|
|
|
+ int numDatanodes = 10;
|
|
|
+ startCluster(NNDIRPATH, numDatanodes,
|
|
|
+ new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, null,
|
|
|
+ false);
|
|
|
+ long diskCapacity = 1000;
|
|
|
+ // set the DISK capacity for testing
|
|
|
+ for (DataNode dn: cluster.getDataNodes()) {
|
|
|
+ for (FsVolumeSpi ref : dn.getFSDataset().getFsVolumeReferences()) {
|
|
|
+ if (ref.getStorageType() == StorageType.DISK) {
|
|
|
+ ((FsVolumeImpl) ref).setCapacityForTesting(diskCapacity);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // trigger heartbeats to update the capacities
|
|
|
+ cluster.triggerHeartbeats();
|
|
|
+ Thread.sleep(10000);
|
|
|
+ // verify namenode stats
|
|
|
+ FSNamesystem namesystem = cluster.getNameNode().getNamesystem();
|
|
|
+ DatanodeStatistics dnStats = namesystem.getBlockManager()
|
|
|
+ .getDatanodeManager().getDatanodeStatistics();
|
|
|
+
|
|
|
+ // total capacity reported includes only the local volumes and
|
|
|
+ // not the provided capacity
|
|
|
+ assertEquals(diskCapacity * numDatanodes, namesystem.getTotal());
|
|
|
+
|
|
|
+ // total storage used should be equal to the totalProvidedStorage
|
|
|
+ // no capacity should be remaining!
|
|
|
+ assertEquals(providedDataSize, dnStats.getProvidedCapacity());
|
|
|
+ assertEquals(providedDataSize, namesystem.getProvidedCapacityTotal());
|
|
|
+ assertEquals(providedDataSize, dnStats.getStorageTypeStats()
|
|
|
+ .get(StorageType.PROVIDED).getCapacityTotal());
|
|
|
+ assertEquals(providedDataSize, dnStats.getStorageTypeStats()
|
|
|
+ .get(StorageType.PROVIDED).getCapacityUsed());
|
|
|
+
|
|
|
+ // verify datanode stats
|
|
|
+ for (DataNode dn: cluster.getDataNodes()) {
|
|
|
+ for (StorageReport report : dn.getFSDataset()
|
|
|
+ .getStorageReports(namesystem.getBlockPoolId())) {
|
|
|
+ if (report.getStorage().getStorageType() == StorageType.PROVIDED) {
|
|
|
+ assertEquals(providedDataSize, report.getCapacity());
|
|
|
+ assertEquals(providedDataSize, report.getDfsUsed());
|
|
|
+ assertEquals(providedDataSize, report.getBlockPoolUsed());
|
|
|
+ assertEquals(0, report.getNonDfsUsed());
|
|
|
+ assertEquals(0, report.getRemaining());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ DFSClient client = new DFSClient(new InetSocketAddress("localhost",
|
|
|
+ cluster.getNameNodePort()), cluster.getConfiguration(0));
|
|
|
+ BlockManager bm = namesystem.getBlockManager();
|
|
|
+ for (int fileId = 0; fileId < numFiles; fileId++) {
|
|
|
+ String filename = "/" + filePrefix + fileId + fileSuffix;
|
|
|
+ LocatedBlocks locatedBlocks = client.getLocatedBlocks(
|
|
|
+ filename, 0, baseFileLen);
|
|
|
+ for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
|
|
|
+ BlockInfo blockInfo =
|
|
|
+ bm.getStoredBlock(locatedBlock.getBlock().getLocalBlock());
|
|
|
+ Iterator<DatanodeStorageInfo> storagesItr = blockInfo.getStorageInfos();
|
|
|
+
|
|
|
+ DatanodeStorageInfo info = storagesItr.next();
|
|
|
+ assertEquals(StorageType.PROVIDED, info.getStorageType());
|
|
|
+ DatanodeDescriptor dnDesc = info.getDatanodeDescriptor();
|
|
|
+ // check the locations that are returned by FSCK have the right name
|
|
|
+ assertEquals(ProvidedStorageMap.ProvidedDescriptor.NETWORK_LOCATION
|
|
|
+ + PATH_SEPARATOR_STR + ProvidedStorageMap.ProvidedDescriptor.NAME,
|
|
|
+ NodeBase.getPath(dnDesc));
|
|
|
+ // no DatanodeStorageInfos should remain
|
|
|
+ assertFalse(storagesItr.hasNext());
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Test(timeout=500000)
|
|
@@ -250,8 +334,8 @@ public class TestNameNodeProvidedImplementation {
|
|
|
// make the last Datanode with only DISK
|
|
|
startCluster(NNDIRPATH, 3, null,
|
|
|
new StorageType[][] {
|
|
|
- {StorageType.PROVIDED},
|
|
|
- {StorageType.PROVIDED},
|
|
|
+ {StorageType.PROVIDED, StorageType.DISK},
|
|
|
+ {StorageType.PROVIDED, StorageType.DISK},
|
|
|
{StorageType.DISK}},
|
|
|
false);
|
|
|
// wait for the replication to finish
|
|
@@ -308,8 +392,9 @@ public class TestNameNodeProvidedImplementation {
|
|
|
FsUGIResolver.class, UGIResolver.class);
|
|
|
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
|
|
FixedBlockResolver.class);
|
|
|
- startCluster(NNDIRPATH, 3, new StorageType[] {StorageType.PROVIDED},
|
|
|
- null, false);
|
|
|
+ startCluster(NNDIRPATH, 3,
|
|
|
+ new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, null,
|
|
|
+ false);
|
|
|
FileSystem fs = cluster.getFileSystem();
|
|
|
Thread.sleep(2000);
|
|
|
int count = 0;
|
|
@@ -371,7 +456,7 @@ public class TestNameNodeProvidedImplementation {
|
|
|
return fs.getFileBlockLocations(path, 0, fileLen);
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
+ @Test(timeout=30000)
|
|
|
public void testClusterWithEmptyImage() throws IOException {
|
|
|
// start a cluster with 2 datanodes without any provided storage
|
|
|
startCluster(NNDIRPATH, 2, null,
|
|
@@ -404,7 +489,7 @@ public class TestNameNodeProvidedImplementation {
|
|
|
* Tests setting replication of provided files.
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
- @Test
|
|
|
+ @Test(timeout=30000)
|
|
|
public void testSetReplicationForProvidedFiles() throws Exception {
|
|
|
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
|
|
FixedBlockResolver.class);
|
|
@@ -441,14 +526,14 @@ public class TestNameNodeProvidedImplementation {
|
|
|
getAndCheckBlockLocations(client, filename, newReplication);
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
+ @Test(timeout=30000)
|
|
|
public void testProvidedDatanodeFailures() throws Exception {
|
|
|
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
|
|
FixedBlockResolver.class);
|
|
|
startCluster(NNDIRPATH, 3, null,
|
|
|
new StorageType[][] {
|
|
|
- {StorageType.PROVIDED},
|
|
|
- {StorageType.PROVIDED},
|
|
|
+ {StorageType.PROVIDED, StorageType.DISK},
|
|
|
+ {StorageType.PROVIDED, StorageType.DISK},
|
|
|
{StorageType.DISK}},
|
|
|
false);
|
|
|
|
|
@@ -511,7 +596,7 @@ public class TestNameNodeProvidedImplementation {
|
|
|
// 2 Datanodes, 1 PROVIDED and other DISK
|
|
|
startCluster(NNDIRPATH, 2, null,
|
|
|
new StorageType[][] {
|
|
|
- {StorageType.PROVIDED},
|
|
|
+ {StorageType.PROVIDED, StorageType.DISK},
|
|
|
{StorageType.DISK}},
|
|
|
false);
|
|
|
|
|
@@ -540,7 +625,7 @@ public class TestNameNodeProvidedImplementation {
|
|
|
// 2 Datanodes, 1 PROVIDED and other DISK
|
|
|
startCluster(NNDIRPATH, 2, null,
|
|
|
new StorageType[][] {
|
|
|
- {StorageType.PROVIDED},
|
|
|
+ {StorageType.PROVIDED, StorageType.DISK},
|
|
|
{StorageType.DISK}},
|
|
|
false);
|
|
|
|
|
@@ -570,7 +655,7 @@ public class TestNameNodeProvidedImplementation {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
+ @Test(timeout=30000)
|
|
|
public void testSetClusterID() throws Exception {
|
|
|
String clusterID = "PROVIDED-CLUSTER";
|
|
|
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
|
@@ -578,7 +663,7 @@ public class TestNameNodeProvidedImplementation {
|
|
|
// 2 Datanodes, 1 PROVIDED and other DISK
|
|
|
startCluster(NNDIRPATH, 2, null,
|
|
|
new StorageType[][] {
|
|
|
- {StorageType.PROVIDED},
|
|
|
+ {StorageType.PROVIDED, StorageType.DISK},
|
|
|
{StorageType.DISK}},
|
|
|
false);
|
|
|
NameNode nn = cluster.getNameNode();
|