|
@@ -36,6 +36,7 @@ import java.util.Random;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.commons.math.stat.descriptive.rank.Min;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -86,6 +87,10 @@ public class MiniDFSCluster {
|
|
|
|
|
|
private static final String NAMESERVICE_ID_PREFIX = "nameserviceId";
|
|
|
private static final Log LOG = LogFactory.getLog(MiniDFSCluster.class);
|
|
|
+ /** System property to set the data dir: {@value} */
|
|
|
+ public static final String PROP_TEST_BUILD_DATA = "test.build.data";
|
|
|
+ /** Configuration option to set the data dir: {@value} */
|
|
|
+ public static final String HDFS_MINIDFS_BASEDIR = "hdfs.minidfs.basedir";
|
|
|
|
|
|
static { DefaultMetricsSystem.setMiniClusterMode(true); }
|
|
|
|
|
@@ -495,7 +500,7 @@ public class MiniDFSCluster {
|
|
|
boolean waitSafeMode, boolean setupHostsFile, boolean federation)
|
|
|
throws IOException {
|
|
|
this.conf = conf;
|
|
|
- base_dir = new File(getBaseDirectory());
|
|
|
+ base_dir = new File(determineDfsBaseDir());
|
|
|
data_dir = new File(base_dir, "data");
|
|
|
this.federation = federation;
|
|
|
this.waitSafeMode = waitSafeMode;
|
|
@@ -504,7 +509,7 @@ public class MiniDFSCluster {
|
|
|
String rpcEngineName = System.getProperty("hdfs.rpc.engine");
|
|
|
if (rpcEngineName != null && !"".equals(rpcEngineName)) {
|
|
|
|
|
|
- System.out.println("HDFS using RPCEngine: "+rpcEngineName);
|
|
|
+ LOG.info("HDFS using RPCEngine: " + rpcEngineName);
|
|
|
try {
|
|
|
Class<?> rpcEngine = conf.getClassByName(rpcEngineName);
|
|
|
setRpcEngine(conf, NamenodeProtocols.class, rpcEngine);
|
|
@@ -858,8 +863,8 @@ public class MiniDFSCluster {
|
|
|
// Set up datanode address
|
|
|
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
|
|
|
if (manageDfsDirs) {
|
|
|
- File dir1 = getStorageDir(i, 0);
|
|
|
- File dir2 = getStorageDir(i, 1);
|
|
|
+ File dir1 = getInstanceStorageDir(i, 0);
|
|
|
+ File dir2 = getInstanceStorageDir(i, 1);
|
|
|
dir1.mkdirs();
|
|
|
dir2.mkdirs();
|
|
|
if (!dir1.isDirectory() || !dir2.isDirectory()) {
|
|
@@ -875,17 +880,17 @@ public class MiniDFSCluster {
|
|
|
dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
|
|
|
simulatedCapacities[i-curDatanodesNum]);
|
|
|
}
|
|
|
- System.out.println("Starting DataNode " + i + " with "
|
|
|
+ LOG.info("Starting DataNode " + i + " with "
|
|
|
+ DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + ": "
|
|
|
+ dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
|
|
|
if (hosts != null) {
|
|
|
dnConf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, hosts[i - curDatanodesNum]);
|
|
|
- System.out.println("Starting DataNode " + i + " with hostname set to: "
|
|
|
+ LOG.info("Starting DataNode " + i + " with hostname set to: "
|
|
|
+ dnConf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY));
|
|
|
}
|
|
|
if (racks != null) {
|
|
|
String name = hosts[i - curDatanodesNum];
|
|
|
- System.out.println("Adding node with hostname : " + name + " to rack "+
|
|
|
+ LOG.info("Adding node with hostname : " + name + " to rack " +
|
|
|
racks[i-curDatanodesNum]);
|
|
|
StaticMapping.addNodeToRack(name,
|
|
|
racks[i-curDatanodesNum]);
|
|
@@ -903,7 +908,7 @@ public class MiniDFSCluster {
|
|
|
String ipAddr = dn.getSelfAddr().getAddress().getHostAddress();
|
|
|
if (racks != null) {
|
|
|
int port = dn.getSelfAddr().getPort();
|
|
|
- System.out.println("Adding node with IP:port : " + ipAddr + ":" + port+
|
|
|
+ LOG.info("Adding node with IP:port : " + ipAddr + ":" + port +
|
|
|
" to rack " + racks[i-curDatanodesNum]);
|
|
|
StaticMapping.addNodeToRack(ipAddr + ":" + port,
|
|
|
racks[i-curDatanodesNum]);
|
|
@@ -1099,7 +1104,7 @@ public class MiniDFSCluster {
|
|
|
* Shutdown all the nodes in the cluster.
|
|
|
*/
|
|
|
public void shutdown() {
|
|
|
- System.out.println("Shutting down the Mini HDFS Cluster");
|
|
|
+ LOG.info("Shutting down the Mini HDFS Cluster");
|
|
|
shutdownDataNodes();
|
|
|
for (NameNodeInfo nnInfo : nameNodes) {
|
|
|
NameNode nameNode = nnInfo.nameNode;
|
|
@@ -1139,7 +1144,7 @@ public class MiniDFSCluster {
|
|
|
public synchronized void shutdownNameNode(int nnIndex) {
|
|
|
NameNode nn = nameNodes[nnIndex].nameNode;
|
|
|
if (nn != null) {
|
|
|
- System.out.println("Shutting down the namenode");
|
|
|
+ LOG.info("Shutting down the namenode");
|
|
|
nn.stop();
|
|
|
nn.join();
|
|
|
Configuration conf = nameNodes[nnIndex].conf;
|
|
@@ -1183,9 +1188,9 @@ public class MiniDFSCluster {
|
|
|
nameNodes[nnIndex] = new NameNodeInfo(nn, conf);
|
|
|
if (waitActive) {
|
|
|
waitClusterUp();
|
|
|
- System.out.println("Restarted the namenode");
|
|
|
+ LOG.info("Restarted the namenode");
|
|
|
waitActive();
|
|
|
- System.out.println("Cluster is active");
|
|
|
+ LOG.info("Cluster is active");
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1261,7 +1266,7 @@ public class MiniDFSCluster {
|
|
|
}
|
|
|
DataNodeProperties dnprop = dataNodes.remove(i);
|
|
|
DataNode dn = dnprop.datanode;
|
|
|
- System.out.println("MiniDFSCluster Stopping DataNode " +
|
|
|
+ LOG.info("MiniDFSCluster Stopping DataNode " +
|
|
|
dn.getMachineName() +
|
|
|
" from a total of " + (dataNodes.size() + 1) +
|
|
|
" datanodes.");
|
|
@@ -1350,7 +1355,7 @@ public class MiniDFSCluster {
|
|
|
for (int i = dataNodes.size() - 1; i >= 0; i--) {
|
|
|
if (!restartDataNode(i, keepPort))
|
|
|
return false;
|
|
|
- System.out.println("Restarted DataNode " + i);
|
|
|
+ LOG.info("Restarted DataNode " + i);
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
@@ -1377,8 +1382,8 @@ public class MiniDFSCluster {
|
|
|
} catch (IOException ioe) {
|
|
|
// This method above should never throw.
|
|
|
// It only throws IOE since it is exposed via RPC
|
|
|
- throw new AssertionError("Unexpected IOE thrown: "
|
|
|
- + StringUtils.stringifyException(ioe));
|
|
|
+ throw (AssertionError)(new AssertionError("Unexpected IOE thrown: "
|
|
|
+ + StringUtils.stringifyException(ioe)).initCause(ioe));
|
|
|
}
|
|
|
boolean isUp = false;
|
|
|
synchronized (this) {
|
|
@@ -1524,7 +1529,7 @@ public class MiniDFSCluster {
|
|
|
failedCount++;
|
|
|
// Cached RPC connection to namenode, if any, is expected to fail once
|
|
|
if (failedCount > 1) {
|
|
|
- System.out.println("Tried waitActive() " + failedCount
|
|
|
+ LOG.warn("Tried waitActive() " + failedCount
|
|
|
+ " time(s) and failed, giving up. "
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
throw e;
|
|
@@ -1576,7 +1581,7 @@ public class MiniDFSCluster {
|
|
|
}
|
|
|
|
|
|
public void formatDataNodeDirs() throws IOException {
|
|
|
- base_dir = new File(getBaseDirectory());
|
|
|
+ base_dir = new File(determineDfsBaseDir());
|
|
|
data_dir = new File(base_dir, "data");
|
|
|
if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
|
|
|
throw new IOException("Cannot remove data directory: " + data_dir);
|
|
@@ -1697,8 +1702,49 @@ public class MiniDFSCluster {
|
|
|
return data_dir.getAbsolutePath();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get the base directory for this MiniDFS instance.
|
|
|
+ * <p/>
|
|
|
+ * Within the MiniDFCluster class and any subclasses, this method should be
|
|
|
+ * used instead of {@link #getBaseDirectory()} which doesn't support
|
|
|
+ * configuration-specific base directories.
|
|
|
+ * <p/>
|
|
|
+ * First the Configuration property {@link #HDFS_MINIDFS_BASEDIR} is fetched.
|
|
|
+ * If non-null, this is returned.
|
|
|
+ * If this is null, then {@link #getBaseDirectory()} is called.
|
|
|
+ * @return the base directory for this instance.
|
|
|
+ */
|
|
|
+ protected String determineDfsBaseDir() {
|
|
|
+ String dfsdir = conf.get(HDFS_MINIDFS_BASEDIR, null);
|
|
|
+ if (dfsdir == null) {
|
|
|
+ dfsdir = getBaseDirectory();
|
|
|
+ }
|
|
|
+ return dfsdir;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the base directory for any DFS cluster whose configuration does
|
|
|
+ * not explicitly set it. This is done by retrieving the system property
|
|
|
+ * {@link #PROP_TEST_BUILD_DATA} (defaulting to "build/test/data" ),
|
|
|
+ * and returning that directory with a subdir of /dfs.
|
|
|
+ * @return a directory for use as a miniDFS filesystem.
|
|
|
+ */
|
|
|
public static String getBaseDirectory() {
|
|
|
- return System.getProperty("test.build.data", "build/test/data") + "/dfs/";
|
|
|
+ return System.getProperty(PROP_TEST_BUILD_DATA, "build/test/data") + "/dfs/";
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get a storage directory for a datanode in this specific instance of
|
|
|
+ * a MiniCluster.
|
|
|
+ *
|
|
|
+ * @param dnIndex datanode index (starts from 0)
|
|
|
+ * @param dirIndex directory index (0 or 1). Index 0 provides access to the
|
|
|
+ * first storage directory. Index 1 provides access to the second
|
|
|
+ * storage directory.
|
|
|
+ * @return Storage directory
|
|
|
+ */
|
|
|
+ public File getInstanceStorageDir(int dnIndex, int dirIndex) {
|
|
|
+ return new File(base_dir, getStorageDirPath(dnIndex, dirIndex));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1716,13 +1762,25 @@ public class MiniDFSCluster {
|
|
|
* @return Storage directory
|
|
|
*/
|
|
|
public static File getStorageDir(int dnIndex, int dirIndex) {
|
|
|
- return new File(getBaseDirectory() + "data/data" + (2*dnIndex + 1 + dirIndex));
|
|
|
+ return new File(getBaseDirectory(), getStorageDirPath(dnIndex, dirIndex));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * Get current directory corresponding to the datanode
|
|
|
- * @param storageDir
|
|
|
- * @return current directory
|
|
|
+ * Calculate the DN instance-specific path for appending to the base dir
|
|
|
+ * to determine the location of the storage of a DN instance in the mini cluster
|
|
|
+ * @param dnIndex datanode index
|
|
|
+ * @param dirIndex directory index (0 or 1).
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private static String getStorageDirPath(int dnIndex, int dirIndex) {
|
|
|
+ return "data/data" + (2 * dnIndex + 1 + dirIndex);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get current directory corresponding to the datanode as defined in
|
|
|
+ * (@link Storage#STORAGE_DIR_CURRENT}
|
|
|
+ * @param storageDir the storage directory of a datanode.
|
|
|
+ * @return the datanode current directory
|
|
|
*/
|
|
|
public static String getDNCurrentDir(File storageDir) {
|
|
|
return storageDir + "/" + Storage.STORAGE_DIR_CURRENT + "/";
|
|
@@ -1730,8 +1788,8 @@ public class MiniDFSCluster {
|
|
|
|
|
|
/**
|
|
|
* Get directory corresponding to block pool directory in the datanode
|
|
|
- * @param storageDir
|
|
|
- * @return current directory
|
|
|
+ * @param storageDir the storage directory of a datanode.
|
|
|
+ * @return the block pool directory
|
|
|
*/
|
|
|
public static String getBPDir(File storageDir, String bpid) {
|
|
|
return getDNCurrentDir(storageDir) + bpid + "/";
|
|
@@ -1777,6 +1835,16 @@ public class MiniDFSCluster {
|
|
|
return new File(getFinalizedDir(storageDir, blk.getBlockPoolId()),
|
|
|
blk.getBlockName());
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Shut down a cluster if it is not null
|
|
|
+ * @param cluster cluster reference or null
|
|
|
+ */
|
|
|
+ public static void shutdownCluster(MiniDFSCluster cluster) {
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Get all files related to a block from all the datanodes
|