|
@@ -68,9 +68,10 @@ public class DataNodeCluster {
|
|
|
static String dataNodeDirs = DATANODE_DIRS;
|
|
|
static final String USAGE =
|
|
|
"Usage: datanodecluster " +
|
|
|
- " -n <numDataNodes> " +
|
|
|
+ " -n <numDataNodes> " +
|
|
|
+ " -bpid <bpid>" +
|
|
|
" [-racks <numRacks>] " +
|
|
|
- " [-simulated] " +
|
|
|
+ " [-simulated [<simulatedCapacityPerDn>]] " +
|
|
|
" [-inject startingBlockId numBlocksPerDN]" +
|
|
|
" [-r replicationFactorForInjectedBlocks]" +
|
|
|
" [-d dataNodeDirs]\n" +
|
|
@@ -91,7 +92,7 @@ public class DataNodeCluster {
|
|
|
printUsageExit();
|
|
|
}
|
|
|
|
|
|
- public static void main(String[] args) {
|
|
|
+ public static void main(String[] args) throws InterruptedException {
|
|
|
int numDataNodes = 0;
|
|
|
int numRacks = 0;
|
|
|
boolean inject = false;
|
|
@@ -99,6 +100,8 @@ public class DataNodeCluster {
|
|
|
int numBlocksPerDNtoInject = 0;
|
|
|
int replication = 1;
|
|
|
boolean checkDataNodeAddrConfig = false;
|
|
|
+ long simulatedCapacityPerDn = SimulatedFSDataset.DEFAULT_CAPACITY;
|
|
|
+ String bpid = null;
|
|
|
|
|
|
Configuration conf = new HdfsConfiguration();
|
|
|
|
|
@@ -115,7 +118,7 @@ public class DataNodeCluster {
|
|
|
numRacks = Integer.parseInt(args[i]);
|
|
|
} else if (args[i].equals("-r")) {
|
|
|
if (++i >= args.length || args[i].startsWith("-")) {
|
|
|
- printUsageExit("Missing replicaiton factor");
|
|
|
+ printUsageExit("Missing replication factor");
|
|
|
}
|
|
|
replication = Integer.parseInt(args[i]);
|
|
|
} else if (args[i].equals("-d")) {
|
|
@@ -125,6 +128,14 @@ public class DataNodeCluster {
|
|
|
dataNodeDirs = args[i];
|
|
|
} else if (args[i].equals("-simulated")) {
|
|
|
SimulatedFSDataset.setFactory(conf);
|
|
|
+ if ((i+1) < args.length && !args[i+1].startsWith("-")) {
|
|
|
+ simulatedCapacityPerDn = Long.parseLong(args[++i]);
|
|
|
+ }
|
|
|
+ } else if (args[i].equals("-bpid")) {
|
|
|
+ if (++i >= args.length || args[i].startsWith("-")) {
|
|
|
+ printUsageExit("Missing blockpoolid parameter");
|
|
|
+ }
|
|
|
+ bpid = args[i];
|
|
|
} else if (args[i].equals("-inject")) {
|
|
|
if (!FsDatasetSpi.Factory.getFactory(conf).isSimulated()) {
|
|
|
System.out.print("-inject is valid only for simulated");
|
|
@@ -153,6 +164,9 @@ public class DataNodeCluster {
|
|
|
printUsageExit("Replication must be less than or equal to numDataNodes");
|
|
|
|
|
|
}
|
|
|
+ if (bpid == null) {
|
|
|
+ printUsageExit("BlockPoolId must be provided");
|
|
|
+ }
|
|
|
String nameNodeAdr = FileSystem.getDefaultUri(conf).getAuthority();
|
|
|
if (nameNodeAdr == null) {
|
|
|
System.out.println("No name node address and port in config");
|
|
@@ -162,9 +176,14 @@ public class DataNodeCluster {
|
|
|
System.out.println("Starting " + numDataNodes +
|
|
|
(simulated ? " Simulated " : " ") +
|
|
|
" Data Nodes that will connect to Name Node at " + nameNodeAdr);
|
|
|
-
|
|
|
+
|
|
|
System.setProperty("test.build.data", dataNodeDirs);
|
|
|
|
|
|
+ long simulatedCapacities[] = new long[numDataNodes];
|
|
|
+ for (int i = 0; i < numDataNodes; ++i) {
|
|
|
+ simulatedCapacities[i] = simulatedCapacityPerDn;
|
|
|
+ }
|
|
|
+
|
|
|
MiniDFSCluster mc = new MiniDFSCluster();
|
|
|
try {
|
|
|
mc.formatDataNodeDirs();
|
|
@@ -182,13 +201,12 @@ public class DataNodeCluster {
|
|
|
//rack4DataNode[i] = racks[i%numRacks];
|
|
|
rack4DataNode[i] = rackPrefix + "-" + i%numRacks;
|
|
|
System.out.println("Data Node " + i + " using " + rack4DataNode[i]);
|
|
|
-
|
|
|
-
|
|
|
}
|
|
|
}
|
|
|
try {
|
|
|
mc.startDataNodes(conf, numDataNodes, true, StartupOption.REGULAR,
|
|
|
- rack4DataNode, null, null, false, checkDataNodeAddrConfig);
|
|
|
+ rack4DataNode, null, simulatedCapacities, false, checkDataNodeAddrConfig);
|
|
|
+ Thread.sleep(10*1000); // Give the DN some time to connect to NN and init storage directories.
|
|
|
if (inject) {
|
|
|
long blockSize = 10;
|
|
|
System.out.println("Injecting " + numBlocksPerDNtoInject +
|
|
@@ -203,7 +221,7 @@ public class DataNodeCluster {
|
|
|
}
|
|
|
for (int i = 1; i <= replication; ++i) {
|
|
|
// inject blocks for dn_i into dn_i and replica in dn_i's neighbors
|
|
|
- mc.injectBlocks((i_dn + i- 1)% numDataNodes, Arrays.asList(blocks));
|
|
|
+ mc.injectBlocks((i_dn + i- 1)% numDataNodes, Arrays.asList(blocks), bpid);
|
|
|
System.out.println("Injecting blocks of dn " + i_dn + " into dn" +
|
|
|
((i_dn + i- 1)% numDataNodes));
|
|
|
}
|