|
@@ -105,6 +105,7 @@ public class MiniDFSCluster {
|
|
|
private int numDataNodes = 1;
|
|
|
private boolean format = true;
|
|
|
private boolean manageNameDfsDirs = true;
|
|
|
+ private boolean enableManagedDfsDirsRedundancy = true;
|
|
|
private boolean manageDataDfsDirs = true;
|
|
|
private StartupOption option = null;
|
|
|
private String[] racks = null;
|
|
@@ -176,6 +177,14 @@ public class MiniDFSCluster {
|
|
|
return this;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Default: true
|
|
|
+ */
|
|
|
+ public Builder enableManagedDfsDirsRedundancy(boolean val) {
|
|
|
+ this.enableManagedDfsDirsRedundancy = val;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Default: true
|
|
|
*/
|
|
@@ -265,6 +274,7 @@ public class MiniDFSCluster {
|
|
|
builder.numDataNodes,
|
|
|
builder.format,
|
|
|
builder.manageNameDfsDirs,
|
|
|
+ builder.enableManagedDfsDirsRedundancy,
|
|
|
builder.manageDataDfsDirs,
|
|
|
builder.option,
|
|
|
builder.racks,
|
|
@@ -488,13 +498,14 @@ public class MiniDFSCluster {
|
|
|
long[] simulatedCapacities) throws IOException {
|
|
|
this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
|
|
|
initMiniDFSCluster(nameNodePort, 0, conf, numDataNodes, format,
|
|
|
- manageNameDfsDirs, manageDataDfsDirs, operation, racks, hosts,
|
|
|
+ manageNameDfsDirs, true, manageDataDfsDirs, operation, racks, hosts,
|
|
|
simulatedCapacities, null, true, false, false);
|
|
|
}
|
|
|
|
|
|
private void initMiniDFSCluster(int nameNodePort, int nameNodeHttpPort,
|
|
|
Configuration conf,
|
|
|
int numDataNodes, boolean format, boolean manageNameDfsDirs,
|
|
|
+ boolean enableManagedDfsDirsRedundancy,
|
|
|
boolean manageDataDfsDirs, StartupOption operation, String[] racks,
|
|
|
String[] hosts, long[] simulatedCapacities, String clusterId,
|
|
|
boolean waitSafeMode, boolean setupHostsFile, boolean federation)
|
|
@@ -544,7 +555,7 @@ public class MiniDFSCluster {
|
|
|
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "127.0.0.1:"
|
|
|
+ nameNodeHttpPort);
|
|
|
NameNode nn = createNameNode(0, conf, numDataNodes, manageNameDfsDirs,
|
|
|
- format, operation, clusterId);
|
|
|
+ enableManagedDfsDirsRedundancy, format, operation, clusterId);
|
|
|
nameNodes[0] = new NameNodeInfo(nn, conf);
|
|
|
FileSystem.setDefaultUri(conf, getURI(0));
|
|
|
} else {
|
|
@@ -554,8 +565,8 @@ public class MiniDFSCluster {
|
|
|
}
|
|
|
}
|
|
|
initFederationConf(conf, nameserviceIds, numDataNodes, nameNodePort);
|
|
|
- createFederationNamenodes(conf, nameserviceIds, manageNameDfsDirs, format,
|
|
|
- operation, clusterId);
|
|
|
+ createFederationNamenodes(conf, nameserviceIds, manageNameDfsDirs,
|
|
|
+ enableManagedDfsDirsRedundancy, format, operation, clusterId);
|
|
|
}
|
|
|
|
|
|
if (format) {
|
|
@@ -603,27 +614,36 @@ public class MiniDFSCluster {
|
|
|
|
|
|
private void createFederationNamenodes(Configuration conf,
|
|
|
Collection<String> nameserviceIds, boolean manageNameDfsDirs,
|
|
|
+ boolean enableManagedDfsDirsRedundancy,
|
|
|
boolean format, StartupOption operation, String clusterId)
|
|
|
throws IOException {
|
|
|
// Create namenodes in the cluster
|
|
|
int nnCounter = 0;
|
|
|
for (String nameserviceId : nameserviceIds) {
|
|
|
createFederatedNameNode(nnCounter++, conf, numDataNodes, manageNameDfsDirs,
|
|
|
- format, operation, clusterId, nameserviceId);
|
|
|
+ enableManagedDfsDirsRedundancy, format, operation, clusterId, nameserviceId);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private NameNode createNameNode(int nnIndex, Configuration conf,
|
|
|
- int numDataNodes, boolean manageNameDfsDirs, boolean format,
|
|
|
- StartupOption operation, String clusterId)
|
|
|
+ int numDataNodes, boolean manageNameDfsDirs,
|
|
|
+ boolean enableManagedDfsDirsRedundancy,
|
|
|
+ boolean format, StartupOption operation, String clusterId)
|
|
|
throws IOException {
|
|
|
if (manageNameDfsDirs) {
|
|
|
- conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
|
|
|
- fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1)))+","+
|
|
|
- fileAsURI(new File(base_dir, "name" + (2*nnIndex + 2))));
|
|
|
- conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
|
|
|
- fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1)))+","+
|
|
|
- fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 2))));
|
|
|
+ if (enableManagedDfsDirsRedundancy) {
|
|
|
+ conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
|
|
|
+ fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1)))+","+
|
|
|
+ fileAsURI(new File(base_dir, "name" + (2*nnIndex + 2))));
|
|
|
+ conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
|
|
|
+ fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1)))+","+
|
|
|
+ fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 2))));
|
|
|
+ } else {
|
|
|
+ conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
|
|
|
+ fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1))).toString());
|
|
|
+ conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
|
|
|
+ fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1))).toString());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Format and clean out DataNode directories
|
|
@@ -643,12 +663,13 @@ public class MiniDFSCluster {
|
|
|
}
|
|
|
|
|
|
private void createFederatedNameNode(int nnIndex, Configuration conf,
|
|
|
- int numDataNodes, boolean manageNameDfsDirs, boolean format,
|
|
|
+ int numDataNodes, boolean manageNameDfsDirs,
|
|
|
+ boolean enableManagedDfsDirsRedundancy, boolean format,
|
|
|
StartupOption operation, String clusterId, String nameserviceId)
|
|
|
throws IOException {
|
|
|
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICE_ID, nameserviceId);
|
|
|
NameNode nn = createNameNode(nnIndex, conf, numDataNodes, manageNameDfsDirs,
|
|
|
- format, operation, clusterId);
|
|
|
+ enableManagedDfsDirsRedundancy, format, operation, clusterId);
|
|
|
conf.set(DFSUtil.getNameServiceIdKey(
|
|
|
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId), NameNode
|
|
|
.getHostPortString(nn.getNameNodeAddress()));
|
|
@@ -1919,7 +1940,7 @@ public class MiniDFSCluster {
|
|
|
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, nameserviceIds);
|
|
|
|
|
|
initFederatedNamenodeAddress(conf, nameserviceId, namenodePort);
|
|
|
- createFederatedNameNode(nnIndex, conf, numDataNodes, true, true, null,
|
|
|
+ createFederatedNameNode(nnIndex, conf, numDataNodes, true, true, true, null,
|
|
|
null, nameserviceId);
|
|
|
|
|
|
// Refresh datanodes with the newly started namenode
|