|
@@ -64,6 +64,8 @@ import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
|
|
|
+import com.google.common.collect.ArrayListMultimap;
|
|
|
+import com.google.common.collect.Multimap;
|
|
|
import com.google.common.base.Supplier;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -488,7 +490,7 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
final int numNameNodes = builder.nnTopology.countNameNodes();
|
|
|
LOG.info("starting cluster: numNameNodes=" + numNameNodes
|
|
|
+ ", numDataNodes=" + builder.numDataNodes);
|
|
|
- nameNodes = new NameNodeInfo[numNameNodes];
|
|
|
+
|
|
|
this.storagesPerDatanode = builder.storagesPerDatanode;
|
|
|
|
|
|
// Duplicate the storageType setting for each DN.
|
|
@@ -559,7 +561,7 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
}
|
|
|
|
|
|
private Configuration conf;
|
|
|
- private NameNodeInfo[] nameNodes;
|
|
|
+ private Multimap<String, NameNodeInfo> namenodes = ArrayListMultimap.create();
|
|
|
protected int numDataNodes;
|
|
|
protected final ArrayList<DataNodeProperties> dataNodes =
|
|
|
new ArrayList<DataNodeProperties>();
|
|
@@ -585,10 +587,10 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
* Stores the information related to a namenode in the cluster
|
|
|
*/
|
|
|
public static class NameNodeInfo {
|
|
|
- final NameNode nameNode;
|
|
|
- final Configuration conf;
|
|
|
- final String nameserviceId;
|
|
|
- final String nnId;
|
|
|
+ public NameNode nameNode;
|
|
|
+ Configuration conf;
|
|
|
+ String nameserviceId;
|
|
|
+ String nnId;
|
|
|
StartupOption startOpt;
|
|
|
NameNodeInfo(NameNode nn, String nameserviceId, String nnId,
|
|
|
StartupOption startOpt, Configuration conf) {
|
|
@@ -617,7 +619,6 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
* without a name node (ie when the name node is started elsewhere).
|
|
|
*/
|
|
|
public MiniDFSCluster() {
|
|
|
- nameNodes = new NameNodeInfo[0]; // No namenode in the cluster
|
|
|
storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
|
|
|
synchronized (MiniDFSCluster.class) {
|
|
|
instanceId = instanceCount++;
|
|
@@ -792,7 +793,6 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
StartupOption operation,
|
|
|
String[] racks, String hosts[],
|
|
|
long[] simulatedCapacities) throws IOException {
|
|
|
- this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
|
|
|
this.storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
|
|
|
initMiniDFSCluster(conf, numDataNodes, null, format,
|
|
|
manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
|
|
@@ -883,7 +883,7 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
createNameNodesAndSetConf(
|
|
|
nnTopology, manageNameDfsDirs, manageNameDfsSharedDirs,
|
|
|
enableManagedDfsDirsRedundancy,
|
|
|
- format, startOpt, clusterId, conf);
|
|
|
+ format, startOpt, clusterId);
|
|
|
} catch (IOException ioe) {
|
|
|
LOG.error("IOE creating namenodes. Permissions dump:\n" +
|
|
|
createPermissionsDiagnosisString(data_dir), ioe);
|
|
@@ -915,9 +915,9 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- for (NameNodeInfo nn : nameNodes) {
|
|
|
+ for (NameNodeInfo nn : namenodes.values()) {
|
|
|
Configuration nnConf = nn.conf;
|
|
|
- for (NameNodeInfo nnInfo : nameNodes) {
|
|
|
+ for (NameNodeInfo nnInfo : namenodes.values()) {
|
|
|
if (nn.equals(nnInfo)) {
|
|
|
continue;
|
|
|
}
|
|
@@ -975,7 +975,125 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology,
|
|
|
boolean manageNameDfsDirs, boolean manageNameDfsSharedDirs,
|
|
|
boolean enableManagedDfsDirsRedundancy, boolean format,
|
|
|
+ StartupOption operation, String clusterId) throws IOException {
|
|
|
+ // do the basic namenode configuration
|
|
|
+ configureNameNodes(nnTopology, federation, conf);
|
|
|
+
|
|
|
+ int nnCounter = 0;
|
|
|
+ int nsCounter = 0;
|
|
|
+ // configure each NS independently
|
|
|
+ for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) {
|
|
|
+ configureNameService(nameservice, nsCounter++, manageNameDfsSharedDirs,
|
|
|
+ manageNameDfsDirs, enableManagedDfsDirsRedundancy,
|
|
|
+ format, operation, clusterId, nnCounter);
|
|
|
+ nnCounter += nameservice.getNNs().size();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Do the rest of the NN configuration for things like shared edits,
|
|
|
+ * as well as directory formatting, etc. for a single nameservice
|
|
|
+ * @param nnCounter the count of the number of namenodes already configured/started. Also,
|
|
|
+ * acts as the <i>index</i> to the next NN to start (since indicies start at 0).
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void configureNameService(MiniDFSNNTopology.NSConf nameservice, int nsCounter,
|
|
|
+ boolean manageNameDfsSharedDirs, boolean manageNameDfsDirs, boolean
|
|
|
+ enableManagedDfsDirsRedundancy, boolean format,
|
|
|
StartupOption operation, String clusterId,
|
|
|
+ final int nnCounter) throws IOException{
|
|
|
+ String nsId = nameservice.getId();
|
|
|
+ String lastDefaultFileSystem = null;
|
|
|
+
|
|
|
+ // If HA is enabled on this nameservice, enumerate all the namenodes
|
|
|
+ // in the configuration. Also need to set a shared edits dir
|
|
|
+ int numNNs = nameservice.getNNs().size();
|
|
|
+ if (numNNs > 1 && manageNameDfsSharedDirs) {
|
|
|
+ URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter + numNNs - 1);
|
|
|
+ conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString());
|
|
|
+ // Clean out the shared edits dir completely, including all subdirectories.
|
|
|
+ FileUtil.fullyDelete(new File(sharedEditsUri));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Now format first NN and copy the storage directory from that node to the others.
|
|
|
+ int nnIndex = nnCounter;
|
|
|
+ Collection<URI> prevNNDirs = null;
|
|
|
+ for (NNConf nn : nameservice.getNNs()) {
|
|
|
+ initNameNodeConf(conf, nsId, nsCounter, nn.getNnId(), manageNameDfsDirs,
|
|
|
+ manageNameDfsDirs, nnIndex);
|
|
|
+ Collection<URI> namespaceDirs = FSNamesystem.getNamespaceDirs(conf);
|
|
|
+ if (format) {
|
|
|
+ // delete the existing namespaces
|
|
|
+ for (URI nameDirUri : namespaceDirs) {
|
|
|
+ File nameDir = new File(nameDirUri);
|
|
|
+ if (nameDir.exists() && !FileUtil.fullyDelete(nameDir)) {
|
|
|
+ throw new IOException("Could not fully delete " + nameDir);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // delete the checkpoint directories, if they exist
|
|
|
+ Collection<URI> checkpointDirs = Util.stringCollectionAsURIs(conf
|
|
|
+ .getTrimmedStringCollection(DFS_NAMENODE_CHECKPOINT_DIR_KEY));
|
|
|
+ for (URI checkpointDirUri : checkpointDirs) {
|
|
|
+ File checkpointDir = new File(checkpointDirUri);
|
|
|
+ if (checkpointDir.exists() && !FileUtil.fullyDelete(checkpointDir)) {
|
|
|
+ throw new IOException("Could not fully delete " + checkpointDir);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean formatThisOne = format;
|
|
|
+ // if we are looking at not the first NN
|
|
|
+ if (nnIndex++ > nnCounter && format) {
|
|
|
+ // Don't format the second, third, etc NN in an HA setup - that
|
|
|
+ // would result in it having a different clusterID,
|
|
|
+ // block pool ID, etc. Instead, copy the name dirs
|
|
|
+ // from the previous one.
|
|
|
+ formatThisOne = false;
|
|
|
+ assert (null != prevNNDirs);
|
|
|
+ copyNameDirs(prevNNDirs, namespaceDirs, conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (formatThisOne) {
|
|
|
+ // Allow overriding clusterID for specific NNs to test
|
|
|
+ // misconfiguration.
|
|
|
+ if (nn.getClusterId() == null) {
|
|
|
+ StartupOption.FORMAT.setClusterId(clusterId);
|
|
|
+ } else {
|
|
|
+ StartupOption.FORMAT.setClusterId(nn.getClusterId());
|
|
|
+ }
|
|
|
+ DFSTestUtil.formatNameNode(conf);
|
|
|
+ }
|
|
|
+ prevNNDirs = namespaceDirs;
|
|
|
+ }
|
|
|
+
|
|
|
+ // create all the namenodes in the namespace
|
|
|
+ nnIndex = nnCounter;
|
|
|
+ for (NNConf nn : nameservice.getNNs()) {
|
|
|
+ Configuration hdfsConf = new Configuration(conf);
|
|
|
+ initNameNodeConf(hdfsConf, nsId, nsCounter, nn.getNnId(), manageNameDfsDirs,
|
|
|
+ enableManagedDfsDirsRedundancy, nnIndex++);
|
|
|
+ createNameNode(hdfsConf, false, operation,
|
|
|
+ clusterId, nsId, nn.getNnId());
|
|
|
+
|
|
|
+ // Record the last namenode uri
|
|
|
+ lastDefaultFileSystem = hdfsConf.get(FS_DEFAULT_NAME_KEY);
|
|
|
+ }
|
|
|
+ if (!federation && lastDefaultFileSystem != null) {
|
|
|
+ // Set the default file system to the actual bind address of NN.
|
|
|
+ conf.set(FS_DEFAULT_NAME_KEY, lastDefaultFileSystem);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Do the basic NN configuration for the topology. Does not configure things like the shared
|
|
|
+ * edits directories
|
|
|
+ * @param nnTopology
|
|
|
+ * @param federation
|
|
|
+ * @param conf
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public static void configureNameNodes(MiniDFSNNTopology nnTopology, boolean federation,
|
|
|
Configuration conf) throws IOException {
|
|
|
Preconditions.checkArgument(nnTopology.countNameNodes() > 0,
|
|
|
"empty NN topology: no namenodes specified!");
|
|
@@ -988,22 +1106,21 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
// NN is started.
|
|
|
conf.set(FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:" + onlyNN.getIpcPort());
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
List<String> allNsIds = Lists.newArrayList();
|
|
|
for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) {
|
|
|
if (nameservice.getId() != null) {
|
|
|
allNsIds.add(nameservice.getId());
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
if (!allNsIds.isEmpty()) {
|
|
|
conf.set(DFS_NAMESERVICES, Joiner.on(",").join(allNsIds));
|
|
|
}
|
|
|
-
|
|
|
- int nnCounter = 0;
|
|
|
+
|
|
|
for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) {
|
|
|
String nsId = nameservice.getId();
|
|
|
- String lastDefaultFileSystem = null;
|
|
|
-
|
|
|
+
|
|
|
Preconditions.checkArgument(
|
|
|
!federation || nsId != null,
|
|
|
"if there is more than one NS, they must have names");
|
|
@@ -1022,83 +1139,10 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
// If HA is enabled on this nameservice, enumerate all the namenodes
|
|
|
// in the configuration. Also need to set a shared edits dir
|
|
|
if (nnIds.size() > 1) {
|
|
|
- conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, nameservice.getId()),
|
|
|
- Joiner.on(",").join(nnIds));
|
|
|
- if (manageNameDfsSharedDirs) {
|
|
|
- URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter+nnIds.size()-1);
|
|
|
- conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString());
|
|
|
- // Clean out the shared edits dir completely, including all subdirectories.
|
|
|
- FileUtil.fullyDelete(new File(sharedEditsUri));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Now format first NN and copy the storage directory from that node to the others.
|
|
|
- int i = 0;
|
|
|
- Collection<URI> prevNNDirs = null;
|
|
|
- int nnCounterForFormat = nnCounter;
|
|
|
- for (NNConf nn : nameservice.getNNs()) {
|
|
|
- initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs,
|
|
|
- enableManagedDfsDirsRedundancy, nnCounterForFormat);
|
|
|
- Collection<URI> namespaceDirs = FSNamesystem.getNamespaceDirs(conf);
|
|
|
- if (format) {
|
|
|
- for (URI nameDirUri : namespaceDirs) {
|
|
|
- File nameDir = new File(nameDirUri);
|
|
|
- if (nameDir.exists() && !FileUtil.fullyDelete(nameDir)) {
|
|
|
- throw new IOException("Could not fully delete " + nameDir);
|
|
|
- }
|
|
|
- }
|
|
|
- Collection<URI> checkpointDirs = Util.stringCollectionAsURIs(conf
|
|
|
- .getTrimmedStringCollection(DFS_NAMENODE_CHECKPOINT_DIR_KEY));
|
|
|
- for (URI checkpointDirUri : checkpointDirs) {
|
|
|
- File checkpointDir = new File(checkpointDirUri);
|
|
|
- if (checkpointDir.exists() && !FileUtil.fullyDelete(checkpointDir)) {
|
|
|
- throw new IOException("Could not fully delete " + checkpointDir);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- boolean formatThisOne = format;
|
|
|
- if (format && i++ > 0) {
|
|
|
- // Don't format the second NN in an HA setup - that
|
|
|
- // would result in it having a different clusterID,
|
|
|
- // block pool ID, etc. Instead, copy the name dirs
|
|
|
- // from the first one.
|
|
|
- formatThisOne = false;
|
|
|
- assert (null != prevNNDirs);
|
|
|
- copyNameDirs(prevNNDirs, namespaceDirs, conf);
|
|
|
- }
|
|
|
-
|
|
|
- nnCounterForFormat++;
|
|
|
- if (formatThisOne) {
|
|
|
- // Allow overriding clusterID for specific NNs to test
|
|
|
- // misconfiguration.
|
|
|
- if (nn.getClusterId() == null) {
|
|
|
- StartupOption.FORMAT.setClusterId(clusterId);
|
|
|
- } else {
|
|
|
- StartupOption.FORMAT.setClusterId(nn.getClusterId());
|
|
|
- }
|
|
|
- DFSTestUtil.formatNameNode(conf);
|
|
|
- }
|
|
|
- prevNNDirs = namespaceDirs;
|
|
|
- }
|
|
|
-
|
|
|
- // Start all Namenodes
|
|
|
- for (NNConf nn : nameservice.getNNs()) {
|
|
|
- Configuration hdfsConf = new Configuration(conf);
|
|
|
- initNameNodeConf(hdfsConf, nsId, nn.getNnId(), manageNameDfsDirs,
|
|
|
- enableManagedDfsDirsRedundancy, nnCounter);
|
|
|
- createNameNode(nnCounter, hdfsConf, numDataNodes, false, operation,
|
|
|
- clusterId, nsId, nn.getNnId());
|
|
|
- // Record the last namenode uri
|
|
|
- lastDefaultFileSystem = hdfsConf.get(FS_DEFAULT_NAME_KEY);
|
|
|
- nnCounter++;
|
|
|
- }
|
|
|
- if (!federation && lastDefaultFileSystem != null) {
|
|
|
- // Set the default file system to the actual bind address of NN.
|
|
|
- conf.set(FS_DEFAULT_NAME_KEY, lastDefaultFileSystem);
|
|
|
+ conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, nameservice.getId()), Joiner
|
|
|
+ .on(",").join(nnIds));
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
public URI getSharedEditsDir(int minNN, int maxNN) throws IOException {
|
|
@@ -1112,39 +1156,92 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
}
|
|
|
|
|
|
public NameNodeInfo[] getNameNodeInfos() {
|
|
|
- return this.nameNodes;
|
|
|
+ return this.namenodes.values().toArray(new NameNodeInfo[0]);
|
|
|
}
|
|
|
|
|
|
- private void initNameNodeConf(Configuration conf,
|
|
|
- String nameserviceId, String nnId,
|
|
|
- boolean manageNameDfsDirs, boolean enableManagedDfsDirsRedundancy,
|
|
|
- int nnIndex) throws IOException {
|
|
|
+ /**
|
|
|
+ * @param nsIndex index of the namespace id to check
|
|
|
+ * @return all the namenodes bound to the given namespace index
|
|
|
+ */
|
|
|
+ public NameNodeInfo[] getNameNodeInfos(int nsIndex) {
|
|
|
+ int i = 0;
|
|
|
+ for (String ns : this.namenodes.keys()) {
|
|
|
+ if (i++ == nsIndex) {
|
|
|
+ return this.namenodes.get(ns).toArray(new NameNodeInfo[0]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param nameservice id of nameservice to read
|
|
|
+ * @return all the namenodes bound to the given namespace index
|
|
|
+ */
|
|
|
+ public NameNodeInfo[] getNameNodeInfos(String nameservice) {
|
|
|
+ for (String ns : this.namenodes.keys()) {
|
|
|
+ if (nameservice.equals(ns)) {
|
|
|
+ return this.namenodes.get(ns).toArray(new NameNodeInfo[0]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private void initNameNodeConf(Configuration conf, String nameserviceId, int nsIndex, String nnId,
|
|
|
+ boolean manageNameDfsDirs, boolean enableManagedDfsDirsRedundancy, int nnIndex)
|
|
|
+ throws IOException {
|
|
|
if (nameserviceId != null) {
|
|
|
conf.set(DFS_NAMESERVICE_ID, nameserviceId);
|
|
|
}
|
|
|
if (nnId != null) {
|
|
|
conf.set(DFS_HA_NAMENODE_ID_KEY, nnId);
|
|
|
}
|
|
|
-
|
|
|
if (manageNameDfsDirs) {
|
|
|
if (enableManagedDfsDirsRedundancy) {
|
|
|
- conf.set(DFS_NAMENODE_NAME_DIR_KEY,
|
|
|
- fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1)))+","+
|
|
|
- fileAsURI(new File(base_dir, "name" + (2*nnIndex + 2))));
|
|
|
- conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY,
|
|
|
- fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1)))+","+
|
|
|
- fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 2))));
|
|
|
+ File[] files = getNameNodeDirectory(nsIndex, nnIndex);
|
|
|
+ conf.set(DFS_NAMENODE_NAME_DIR_KEY, fileAsURI(files[0]) + "," + fileAsURI(files[1]));
|
|
|
+ files = getCheckpointDirectory(nsIndex, nnIndex);
|
|
|
+ conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY, fileAsURI(files[0]) + "," + fileAsURI(files[1]));
|
|
|
} else {
|
|
|
- conf.set(DFS_NAMENODE_NAME_DIR_KEY,
|
|
|
- fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1))).
|
|
|
- toString());
|
|
|
- conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY,
|
|
|
- fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1))).
|
|
|
- toString());
|
|
|
+ File[] files = getNameNodeDirectory(nsIndex, nnIndex);
|
|
|
+ conf.set(DFS_NAMENODE_NAME_DIR_KEY, fileAsURI(files[0]).toString());
|
|
|
+ files = getCheckpointDirectory(nsIndex, nnIndex);
|
|
|
+ conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY, fileAsURI(files[0]).toString());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private File[] getNameNodeDirectory(int nameserviceIndex, int nnIndex) {
|
|
|
+ return getNameNodeDirectory(base_dir, nameserviceIndex, nnIndex);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static File[] getNameNodeDirectory(String base_dir, int nsIndex, int nnIndex) {
|
|
|
+ return getNameNodeDirectory(new File(base_dir), nsIndex, nnIndex);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static File[] getNameNodeDirectory(File base_dir, int nsIndex, int nnIndex) {
|
|
|
+ File[] files = new File[2];
|
|
|
+ files[0] = new File(base_dir, "name-" + nsIndex + "-" + (2 * nnIndex + 1));
|
|
|
+ files[1] = new File(base_dir, "name-" + nsIndex + "-" + (2 * nnIndex + 2));
|
|
|
+ return files;
|
|
|
+ }
|
|
|
+
|
|
|
+ public File[] getCheckpointDirectory(int nsIndex, int nnIndex) {
|
|
|
+ return getCheckpointDirectory(base_dir, nsIndex, nnIndex);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static File[] getCheckpointDirectory(String base_dir, int nsIndex, int nnIndex) {
|
|
|
+ return getCheckpointDirectory(new File(base_dir), nsIndex, nnIndex);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static File[] getCheckpointDirectory(File base_dir, int nsIndex, int nnIndex) {
|
|
|
+ File[] files = new File[2];
|
|
|
+ files[0] = new File(base_dir, "namesecondary-" + nsIndex + "-" + (2 * nnIndex + 1));
|
|
|
+ files[1] = new File(base_dir, "namesecondary-" + nsIndex + "-" + (2 * nnIndex + 2));
|
|
|
+ return files;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
public static void copyNameDirs(Collection<URI> srcDirs, Collection<URI> dstDirs,
|
|
|
Configuration dstConf) throws IOException {
|
|
|
URI srcDir = Lists.newArrayList(srcDirs).get(0);
|
|
@@ -1197,10 +1294,8 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
return args;
|
|
|
}
|
|
|
|
|
|
- private void createNameNode(int nnIndex, Configuration hdfsConf,
|
|
|
- int numDataNodes, boolean format, StartupOption operation,
|
|
|
- String clusterId, String nameserviceId,
|
|
|
- String nnId)
|
|
|
+ private void createNameNode(Configuration hdfsConf, boolean format, StartupOption operation,
|
|
|
+ String clusterId, String nameserviceId, String nnId)
|
|
|
throws IOException {
|
|
|
// Format and clean out DataNode directories
|
|
|
if (format) {
|
|
@@ -1237,8 +1332,9 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
copyKeys(hdfsConf, conf, nameserviceId, nnId);
|
|
|
DFSUtil.setGenericConf(hdfsConf, nameserviceId, nnId,
|
|
|
DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
|
|
- nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId,
|
|
|
+ NameNodeInfo info = new NameNodeInfo(nn, nameserviceId, nnId,
|
|
|
operation, hdfsConf);
|
|
|
+ namenodes.put(nameserviceId, info);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1254,7 +1350,7 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
*/
|
|
|
public URI getURI(int nnIndex) {
|
|
|
String hostPort =
|
|
|
- nameNodes[nnIndex].nameNode.getNameNodeAddressHostPortString();
|
|
|
+ getNN(nnIndex).nameNode.getNameNodeAddressHostPortString();
|
|
|
URI uri = null;
|
|
|
try {
|
|
|
uri = new URI("hdfs://" + hostPort);
|
|
@@ -1272,9 +1368,21 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
* @return Configuration of for the given namenode
|
|
|
*/
|
|
|
public Configuration getConfiguration(int nnIndex) {
|
|
|
- return nameNodes[nnIndex].conf;
|
|
|
+ return getNN(nnIndex).conf;
|
|
|
+ }
|
|
|
+
|
|
|
+ private NameNodeInfo getNN(int nnIndex) {
|
|
|
+ int count = 0;
|
|
|
+ for (NameNodeInfo nn : namenodes.values()) {
|
|
|
+ if (count == nnIndex) {
|
|
|
+ return nn;
|
|
|
+ }
|
|
|
+ count++;
|
|
|
+ }
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
/**
|
|
|
* wait for the given namenode to get out of safemode.
|
|
|
*/
|
|
@@ -1685,7 +1793,7 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
public void finalizeCluster(int nnIndex, Configuration conf) throws Exception {
|
|
|
- finalizeNamenode(nameNodes[nnIndex].nameNode, nameNodes[nnIndex].conf);
|
|
|
+ finalizeNamenode(getNN(nnIndex).nameNode, getNN(nnIndex).conf);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1696,7 +1804,7 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
* @throws IllegalStateException if the Namenode is not running.
|
|
|
*/
|
|
|
public void finalizeCluster(Configuration conf) throws Exception {
|
|
|
- for (NameNodeInfo nnInfo : nameNodes) {
|
|
|
+ for (NameNodeInfo nnInfo : namenodes.values()) {
|
|
|
if (nnInfo == null) {
|
|
|
throw new IllegalStateException("Attempting to finalize "
|
|
|
+ "Namenode but it is not running");
|
|
@@ -1704,9 +1812,9 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
finalizeNamenode(nnInfo.nameNode, nnInfo.conf);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public int getNumNameNodes() {
|
|
|
- return nameNodes.length;
|
|
|
+ return namenodes.size();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1736,7 +1844,7 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
* Gets the NameNode for the index. May be null.
|
|
|
*/
|
|
|
public NameNode getNameNode(int nnIndex) {
|
|
|
- return nameNodes[nnIndex].nameNode;
|
|
|
+ return getNN(nnIndex).nameNode;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1745,11 +1853,11 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
*/
|
|
|
public FSNamesystem getNamesystem() {
|
|
|
checkSingleNameNode();
|
|
|
- return NameNodeAdapter.getNamesystem(nameNodes[0].nameNode);
|
|
|
+ return NameNodeAdapter.getNamesystem(getNN(0).nameNode);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public FSNamesystem getNamesystem(int nnIndex) {
|
|
|
- return NameNodeAdapter.getNamesystem(nameNodes[nnIndex].nameNode);
|
|
|
+ return NameNodeAdapter.getNamesystem(getNN(nnIndex).nameNode);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1811,14 +1919,14 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
* caller supplied port is not necessarily the actual port used.
|
|
|
*/
|
|
|
public int getNameNodePort(int nnIndex) {
|
|
|
- return nameNodes[nnIndex].nameNode.getNameNodeAddress().getPort();
|
|
|
+ return getNN(nnIndex).nameNode.getNameNodeAddress().getPort();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* @return the service rpc port used by the NameNode at the given index.
|
|
|
*/
|
|
|
public int getNameNodeServicePort(int nnIndex) {
|
|
|
- return nameNodes[nnIndex].nameNode.getServiceRpcAddress().getPort();
|
|
|
+ return getNN(nnIndex).nameNode.getServiceRpcAddress().getPort();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1859,7 +1967,7 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
fileSystems.clear();
|
|
|
}
|
|
|
shutdownDataNodes();
|
|
|
- for (NameNodeInfo nnInfo : nameNodes) {
|
|
|
+ for (NameNodeInfo nnInfo : namenodes.values()) {
|
|
|
if (nnInfo == null) continue;
|
|
|
stopAndJoinNameNode(nnInfo.nameNode);
|
|
|
}
|
|
@@ -1897,7 +2005,7 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
* Shutdown all the namenodes.
|
|
|
*/
|
|
|
public synchronized void shutdownNameNodes() {
|
|
|
- for (int i = 0; i < nameNodes.length; i++) {
|
|
|
+ for (int i = 0; i < namenodes.size(); i++) {
|
|
|
shutdownNameNode(i);
|
|
|
}
|
|
|
}
|
|
@@ -1906,11 +2014,13 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
* Shutdown the namenode at a given index.
|
|
|
*/
|
|
|
public synchronized void shutdownNameNode(int nnIndex) {
|
|
|
- NameNode nn = nameNodes[nnIndex].nameNode;
|
|
|
+ NameNodeInfo info = getNN(nnIndex);
|
|
|
+ NameNode nn = info.nameNode;
|
|
|
if (nn != null) {
|
|
|
stopAndJoinNameNode(nn);
|
|
|
- Configuration conf = nameNodes[nnIndex].conf;
|
|
|
- nameNodes[nnIndex] = new NameNodeInfo(null, null, null, null, conf);
|
|
|
+ info.nnId = null;
|
|
|
+ info.nameNode = null;
|
|
|
+ info.nameserviceId = null;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1931,7 +2041,7 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
* Restart all namenodes.
|
|
|
*/
|
|
|
public synchronized void restartNameNodes() throws IOException {
|
|
|
- for (int i = 0; i < nameNodes.length; i++) {
|
|
|
+ for (int i = 0; i < namenodes.size(); i++) {
|
|
|
restartNameNode(i, false);
|
|
|
}
|
|
|
waitActive();
|
|
@@ -1967,19 +2077,19 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
*/
|
|
|
public synchronized void restartNameNode(int nnIndex, boolean waitActive,
|
|
|
String... args) throws IOException {
|
|
|
- String nameserviceId = nameNodes[nnIndex].nameserviceId;
|
|
|
- String nnId = nameNodes[nnIndex].nnId;
|
|
|
- StartupOption startOpt = nameNodes[nnIndex].startOpt;
|
|
|
- Configuration conf = nameNodes[nnIndex].conf;
|
|
|
+ NameNodeInfo info = getNN(nnIndex);
|
|
|
+ StartupOption startOpt = info.startOpt;
|
|
|
+
|
|
|
shutdownNameNode(nnIndex);
|
|
|
if (args.length != 0) {
|
|
|
startOpt = null;
|
|
|
} else {
|
|
|
args = createArgs(startOpt);
|
|
|
}
|
|
|
- NameNode nn = NameNode.createNameNode(args, conf);
|
|
|
- nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId, startOpt,
|
|
|
- conf);
|
|
|
+
|
|
|
+ NameNode nn = NameNode.createNameNode(args, info.conf);
|
|
|
+ info.nameNode = nn;
|
|
|
+ info.setStartOpt(startOpt);
|
|
|
if (waitActive) {
|
|
|
waitClusterUp();
|
|
|
LOG.info("Restarted the namenode");
|
|
@@ -2343,7 +2453,7 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
* or if waiting for safe mode is disabled.
|
|
|
*/
|
|
|
public boolean isNameNodeUp(int nnIndex) {
|
|
|
- NameNode nameNode = nameNodes[nnIndex].nameNode;
|
|
|
+ NameNode nameNode = getNN(nnIndex).nameNode;
|
|
|
if (nameNode == null) {
|
|
|
return false;
|
|
|
}
|
|
@@ -2361,7 +2471,7 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
* Returns true if all the NameNodes are running and is out of Safe Mode.
|
|
|
*/
|
|
|
public boolean isClusterUp() {
|
|
|
- for (int index = 0; index < nameNodes.length; index++) {
|
|
|
+ for (int index = 0; index < namenodes.size(); index++) {
|
|
|
if (!isNameNodeUp(index)) {
|
|
|
return false;
|
|
|
}
|
|
@@ -2391,15 +2501,13 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
checkSingleNameNode();
|
|
|
return getFileSystem(0);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Get a client handle to the DFS cluster for the namenode at given index.
|
|
|
*/
|
|
|
public DistributedFileSystem getFileSystem(int nnIndex) throws IOException {
|
|
|
- DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(
|
|
|
- getURI(nnIndex), nameNodes[nnIndex].conf);
|
|
|
- fileSystems.add(dfs);
|
|
|
- return dfs;
|
|
|
+ return (DistributedFileSystem) addFileSystem(FileSystem.get(getURI(nnIndex),
|
|
|
+ getNN(nnIndex).conf));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2407,17 +2515,20 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
* This simulating different threads working on different FileSystem instances.
|
|
|
*/
|
|
|
public FileSystem getNewFileSystemInstance(int nnIndex) throws IOException {
|
|
|
- FileSystem dfs = FileSystem.newInstance(getURI(nnIndex), nameNodes[nnIndex].conf);
|
|
|
- fileSystems.add(dfs);
|
|
|
- return dfs;
|
|
|
+ return addFileSystem(FileSystem.newInstance(getURI(nnIndex), getNN(nnIndex).conf));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ private <T extends FileSystem> T addFileSystem(T fs) {
|
|
|
+ fileSystems.add(fs);
|
|
|
+ return fs;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* @return a http URL
|
|
|
*/
|
|
|
public String getHttpUri(int nnIndex) {
|
|
|
return "http://"
|
|
|
- + nameNodes[nnIndex].conf
|
|
|
+ + getNN(nnIndex).conf
|
|
|
.get(DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
|
|
}
|
|
|
|
|
@@ -2426,7 +2537,7 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
*/
|
|
|
public HftpFileSystem getHftpFileSystem(int nnIndex) throws IOException {
|
|
|
String uri = "hftp://"
|
|
|
- + nameNodes[nnIndex].conf
|
|
|
+ + getNN(nnIndex).conf
|
|
|
.get(DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
|
|
try {
|
|
|
return (HftpFileSystem)FileSystem.get(new URI(uri), conf);
|
|
@@ -2455,14 +2566,14 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
* Get the directories where the namenode stores its image.
|
|
|
*/
|
|
|
public Collection<URI> getNameDirs(int nnIndex) {
|
|
|
- return FSNamesystem.getNamespaceDirs(nameNodes[nnIndex].conf);
|
|
|
+ return FSNamesystem.getNamespaceDirs(getNN(nnIndex).conf);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Get the directories where the namenode stores its edits.
|
|
|
*/
|
|
|
public Collection<URI> getNameEditsDirs(int nnIndex) throws IOException {
|
|
|
- return FSNamesystem.getNamespaceEditsDirs(nameNodes[nnIndex].conf);
|
|
|
+ return FSNamesystem.getNamespaceEditsDirs(getNN(nnIndex).conf);
|
|
|
}
|
|
|
|
|
|
public void transitionToActive(int nnIndex) throws IOException,
|
|
@@ -2503,11 +2614,12 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
|
|
|
/** Wait until the given namenode gets registration from all the datanodes */
|
|
|
public void waitActive(int nnIndex) throws IOException {
|
|
|
- if (nameNodes.length == 0 || nameNodes[nnIndex] == null
|
|
|
- || nameNodes[nnIndex].nameNode == null) {
|
|
|
+ if (namenodes.size() == 0 || getNN(nnIndex) == null || getNN(nnIndex).nameNode == null) {
|
|
|
return;
|
|
|
}
|
|
|
- InetSocketAddress addr = nameNodes[nnIndex].nameNode.getServiceRpcAddress();
|
|
|
+
|
|
|
+ NameNodeInfo info = getNN(nnIndex);
|
|
|
+ InetSocketAddress addr = info.nameNode.getServiceRpcAddress();
|
|
|
assert addr.getPort() != 0;
|
|
|
DFSClient client = new DFSClient(addr, conf);
|
|
|
|
|
@@ -2526,8 +2638,8 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
/** Wait until the given namenode gets first block reports from all the datanodes */
|
|
|
public void waitFirstBRCompleted(int nnIndex, int timeout) throws
|
|
|
IOException, TimeoutException, InterruptedException {
|
|
|
- if (nameNodes.length == 0 || nameNodes[nnIndex] == null
|
|
|
- || nameNodes[nnIndex].nameNode == null) {
|
|
|
+ if (namenodes.size() == 0 || getNN(nnIndex) == null
|
|
|
+ || getNN(nnIndex).nameNode == null) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -2552,7 +2664,7 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
* Wait until the cluster is active and running.
|
|
|
*/
|
|
|
public void waitActive() throws IOException {
|
|
|
- for (int index = 0; index < nameNodes.length; index++) {
|
|
|
+ for (int index = 0; index < namenodes.size(); index++) {
|
|
|
int failedCount = 0;
|
|
|
while (true) {
|
|
|
try {
|
|
@@ -2572,7 +2684,14 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
}
|
|
|
LOG.info("Cluster is active");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ public void printNNs() {
|
|
|
+ for (int i = 0; i < namenodes.size(); i++) {
|
|
|
+ LOG.info("Have namenode " + i + ", info:" + getNN(i));
|
|
|
+ LOG.info(" has namenode: " + getNN(i).nameNode);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private synchronized boolean shouldWait(DatanodeInfo[] dnInfo,
|
|
|
InetSocketAddress addr) {
|
|
|
// If a datanode failed to start, then do not wait
|
|
@@ -3020,7 +3139,7 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
* namenode
|
|
|
*/
|
|
|
private void checkSingleNameNode() {
|
|
|
- if (nameNodes.length != 1) {
|
|
|
+ if (namenodes.size() != 1) {
|
|
|
throw new IllegalArgumentException("Namenode index is needed");
|
|
|
}
|
|
|
}
|
|
@@ -3036,13 +3155,9 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
if(!federation)
|
|
|
throw new IOException("cannot add namenode to non-federated cluster");
|
|
|
|
|
|
- int nnIndex = nameNodes.length;
|
|
|
- int numNameNodes = nameNodes.length + 1;
|
|
|
- NameNodeInfo[] newlist = new NameNodeInfo[numNameNodes];
|
|
|
- System.arraycopy(nameNodes, 0, newlist, 0, nameNodes.length);
|
|
|
- nameNodes = newlist;
|
|
|
- String nameserviceId = NAMESERVICE_ID_PREFIX + (nnIndex + 1);
|
|
|
-
|
|
|
+ int nameServiceIndex = namenodes.keys().size();
|
|
|
+ String nameserviceId = NAMESERVICE_ID_PREFIX + (namenodes.keys().size() + 1);
|
|
|
+
|
|
|
String nameserviceIds = conf.get(DFS_NAMESERVICES);
|
|
|
nameserviceIds += "," + nameserviceId;
|
|
|
conf.set(DFS_NAMESERVICES, nameserviceIds);
|
|
@@ -3050,9 +3165,11 @@ public class MiniDFSCluster implements AutoCloseable {
|
|
|
String nnId = null;
|
|
|
initNameNodeAddress(conf, nameserviceId,
|
|
|
new NNConf(nnId).setIpcPort(namenodePort));
|
|
|
- initNameNodeConf(conf, nameserviceId, nnId, true, true, nnIndex);
|
|
|
- createNameNode(nnIndex, conf, numDataNodes, true, null, null,
|
|
|
- nameserviceId, nnId);
|
|
|
+ // figure out the current number of NNs
|
|
|
+ NameNodeInfo[] infos = this.getNameNodeInfos(nameserviceId);
|
|
|
+ int nnIndex = infos == null ? 0 : infos.length;
|
|
|
+ initNameNodeConf(conf, nameserviceId, nameServiceIndex, nnId, true, true, nnIndex);
|
|
|
+ createNameNode(conf, true, null, null, nameserviceId, nnId);
|
|
|
|
|
|
// Refresh datanodes with the newly started namenode
|
|
|
for (DataNodeProperties dn : dataNodes) {
|