|
@@ -55,6 +55,7 @@ import java.net.InetSocketAddress;
|
|
import java.net.URI;
|
|
import java.net.URI;
|
|
import java.net.URISyntaxException;
|
|
import java.net.URISyntaxException;
|
|
import java.nio.channels.FileChannel;
|
|
import java.nio.channels.FileChannel;
|
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
@@ -90,9 +91,7 @@ import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter;
|
|
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
|
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
|
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
|
|
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
@@ -133,15 +132,11 @@ public class MiniDFSCluster {
|
|
public static final String DFS_NAMENODE_SAFEMODE_EXTENSION_TESTING_KEY
|
|
public static final String DFS_NAMENODE_SAFEMODE_EXTENSION_TESTING_KEY
|
|
= DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + ".testing";
|
|
= DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + ".testing";
|
|
|
|
|
|
- // Changing this default may break some tests that assume it is 2.
|
|
|
|
- private static final int DEFAULT_STORAGES_PER_DATANODE = 2;
|
|
|
|
|
|
+ // Changing this value may break some tests that assume it is 2.
|
|
|
|
+ public static final int DIRS_PER_DATANODE = 2;
|
|
|
|
|
|
static { DefaultMetricsSystem.setMiniClusterMode(true); }
|
|
static { DefaultMetricsSystem.setMiniClusterMode(true); }
|
|
|
|
|
|
- public int getStoragesPerDatanode() {
|
|
|
|
- return storagesPerDatanode;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Class to construct instances of MiniDFSClusters with specific options.
|
|
* Class to construct instances of MiniDFSClusters with specific options.
|
|
*/
|
|
*/
|
|
@@ -151,8 +146,6 @@ public class MiniDFSCluster {
|
|
private final Configuration conf;
|
|
private final Configuration conf;
|
|
private int numDataNodes = 1;
|
|
private int numDataNodes = 1;
|
|
private StorageType[][] storageTypes = null;
|
|
private StorageType[][] storageTypes = null;
|
|
- private StorageType[] storageTypes1D = null;
|
|
|
|
- private int storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
|
|
|
|
private boolean format = true;
|
|
private boolean format = true;
|
|
private boolean manageNameDfsDirs = true;
|
|
private boolean manageNameDfsDirs = true;
|
|
private boolean manageNameDfsSharedDirs = true;
|
|
private boolean manageNameDfsSharedDirs = true;
|
|
@@ -163,8 +156,6 @@ public class MiniDFSCluster {
|
|
private String[] racks = null;
|
|
private String[] racks = null;
|
|
private String [] hosts = null;
|
|
private String [] hosts = null;
|
|
private long [] simulatedCapacities = null;
|
|
private long [] simulatedCapacities = null;
|
|
- private long [][] storageCapacities = null;
|
|
|
|
- private long [] storageCapacities1D = null;
|
|
|
|
private String clusterId = null;
|
|
private String clusterId = null;
|
|
private boolean waitSafeMode = true;
|
|
private boolean waitSafeMode = true;
|
|
private boolean setupHostsFile = false;
|
|
private boolean setupHostsFile = false;
|
|
@@ -202,21 +193,17 @@ public class MiniDFSCluster {
|
|
return this;
|
|
return this;
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Default: DEFAULT_STORAGES_PER_DATANODE
|
|
|
|
- */
|
|
|
|
- public Builder storagesPerDatanode(int numStorages) {
|
|
|
|
- this.storagesPerDatanode = numStorages;
|
|
|
|
- return this;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Set the same storage type configuration for each datanode.
|
|
* Set the same storage type configuration for each datanode.
|
|
* If storageTypes is uninitialized or passed null then
|
|
* If storageTypes is uninitialized or passed null then
|
|
* StorageType.DEFAULT is used.
|
|
* StorageType.DEFAULT is used.
|
|
*/
|
|
*/
|
|
public Builder storageTypes(StorageType[] types) {
|
|
public Builder storageTypes(StorageType[] types) {
|
|
- this.storageTypes1D = types;
|
|
|
|
|
|
+ assert types.length == DIRS_PER_DATANODE;
|
|
|
|
+ this.storageTypes = new StorageType[numDataNodes][types.length];
|
|
|
|
+ for (int i = 0; i < numDataNodes; ++i) {
|
|
|
|
+ this.storageTypes[i] = types;
|
|
|
|
+ }
|
|
return this;
|
|
return this;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -230,26 +217,6 @@ public class MiniDFSCluster {
|
|
return this;
|
|
return this;
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Set the same storage capacity configuration for each datanode.
|
|
|
|
- * If storageTypes is uninitialized or passed null then
|
|
|
|
- * StorageType.DEFAULT is used.
|
|
|
|
- */
|
|
|
|
- public Builder storageCapacities(long[] capacities) {
|
|
|
|
- this.storageCapacities1D = capacities;
|
|
|
|
- return this;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Set custom storage capacity configuration for each datanode.
|
|
|
|
- * If storageCapacities is uninitialized or passed null then
|
|
|
|
- * capacity is limited by available disk space.
|
|
|
|
- */
|
|
|
|
- public Builder storageCapacities(long[][] capacities) {
|
|
|
|
- this.storageCapacities = capacities;
|
|
|
|
- return this;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Default: true
|
|
* Default: true
|
|
*/
|
|
*/
|
|
@@ -323,11 +290,6 @@ public class MiniDFSCluster {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Use SimulatedFSDataset and limit the capacity of each DN per
|
|
|
|
- * the values passed in val.
|
|
|
|
- *
|
|
|
|
- * For limiting the capacity of volumes with real storage, see
|
|
|
|
- * {@link FsVolumeImpl#setCapacityForTesting}
|
|
|
|
* Default: null
|
|
* Default: null
|
|
*/
|
|
*/
|
|
public Builder simulatedCapacities(long[] val) {
|
|
public Builder simulatedCapacities(long[] val) {
|
|
@@ -430,28 +392,7 @@ public class MiniDFSCluster {
|
|
LOG.info("starting cluster: numNameNodes=" + numNameNodes
|
|
LOG.info("starting cluster: numNameNodes=" + numNameNodes
|
|
+ ", numDataNodes=" + builder.numDataNodes);
|
|
+ ", numDataNodes=" + builder.numDataNodes);
|
|
nameNodes = new NameNodeInfo[numNameNodes];
|
|
nameNodes = new NameNodeInfo[numNameNodes];
|
|
- this.storagesPerDatanode = builder.storagesPerDatanode;
|
|
|
|
-
|
|
|
|
- // Duplicate the storageType setting for each DN.
|
|
|
|
- if (builder.storageTypes == null && builder.storageTypes1D != null) {
|
|
|
|
- assert builder.storageTypes1D.length == storagesPerDatanode;
|
|
|
|
- builder.storageTypes = new StorageType[builder.numDataNodes][storagesPerDatanode];
|
|
|
|
|
|
|
|
- for (int i = 0; i < builder.numDataNodes; ++i) {
|
|
|
|
- builder.storageTypes[i] = builder.storageTypes1D;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Duplicate the storageCapacity setting for each DN.
|
|
|
|
- if (builder.storageCapacities == null && builder.storageCapacities1D != null) {
|
|
|
|
- assert builder.storageCapacities1D.length == storagesPerDatanode;
|
|
|
|
- builder.storageCapacities = new long[builder.numDataNodes][storagesPerDatanode];
|
|
|
|
-
|
|
|
|
- for (int i = 0; i < builder.numDataNodes; ++i) {
|
|
|
|
- builder.storageCapacities[i] = builder.storageCapacities1D;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
initMiniDFSCluster(builder.conf,
|
|
initMiniDFSCluster(builder.conf,
|
|
builder.numDataNodes,
|
|
builder.numDataNodes,
|
|
builder.storageTypes,
|
|
builder.storageTypes,
|
|
@@ -464,7 +405,6 @@ public class MiniDFSCluster {
|
|
builder.dnOption,
|
|
builder.dnOption,
|
|
builder.racks,
|
|
builder.racks,
|
|
builder.hosts,
|
|
builder.hosts,
|
|
- builder.storageCapacities,
|
|
|
|
builder.simulatedCapacities,
|
|
builder.simulatedCapacities,
|
|
builder.clusterId,
|
|
builder.clusterId,
|
|
builder.waitSafeMode,
|
|
builder.waitSafeMode,
|
|
@@ -507,7 +447,6 @@ public class MiniDFSCluster {
|
|
private boolean waitSafeMode = true;
|
|
private boolean waitSafeMode = true;
|
|
private boolean federation;
|
|
private boolean federation;
|
|
private boolean checkExitOnShutdown = true;
|
|
private boolean checkExitOnShutdown = true;
|
|
- protected final int storagesPerDatanode;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
* A unique instance identifier for the cluster. This
|
|
* A unique instance identifier for the cluster. This
|
|
@@ -546,7 +485,6 @@ public class MiniDFSCluster {
|
|
*/
|
|
*/
|
|
public MiniDFSCluster() {
|
|
public MiniDFSCluster() {
|
|
nameNodes = new NameNodeInfo[0]; // No namenode in the cluster
|
|
nameNodes = new NameNodeInfo[0]; // No namenode in the cluster
|
|
- storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
|
|
|
|
synchronized (MiniDFSCluster.class) {
|
|
synchronized (MiniDFSCluster.class) {
|
|
instanceId = instanceCount++;
|
|
instanceId = instanceCount++;
|
|
}
|
|
}
|
|
@@ -721,12 +659,11 @@ public class MiniDFSCluster {
|
|
String[] racks, String hosts[],
|
|
String[] racks, String hosts[],
|
|
long[] simulatedCapacities) throws IOException {
|
|
long[] simulatedCapacities) throws IOException {
|
|
this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
|
|
this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
|
|
- this.storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
|
|
|
|
initMiniDFSCluster(conf, numDataNodes, null, format,
|
|
initMiniDFSCluster(conf, numDataNodes, null, format,
|
|
- manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
|
|
|
|
- operation, null, racks, hosts,
|
|
|
|
- null, simulatedCapacities, null, true, false,
|
|
|
|
- MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false, null);
|
|
|
|
|
|
+ manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
|
|
|
|
+ operation, null, racks, hosts,
|
|
|
|
+ simulatedCapacities, null, true, false,
|
|
|
|
+ MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false, null);
|
|
}
|
|
}
|
|
|
|
|
|
private void initMiniDFSCluster(
|
|
private void initMiniDFSCluster(
|
|
@@ -735,8 +672,7 @@ public class MiniDFSCluster {
|
|
boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy,
|
|
boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy,
|
|
boolean manageDataDfsDirs, StartupOption startOpt,
|
|
boolean manageDataDfsDirs, StartupOption startOpt,
|
|
StartupOption dnStartOpt, String[] racks,
|
|
StartupOption dnStartOpt, String[] racks,
|
|
- String[] hosts,
|
|
|
|
- long[][] storageCapacities, long[] simulatedCapacities, String clusterId,
|
|
|
|
|
|
+ String[] hosts, long[] simulatedCapacities, String clusterId,
|
|
boolean waitSafeMode, boolean setupHostsFile,
|
|
boolean waitSafeMode, boolean setupHostsFile,
|
|
MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown,
|
|
MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown,
|
|
boolean checkDataNodeAddrConfig,
|
|
boolean checkDataNodeAddrConfig,
|
|
@@ -810,7 +746,7 @@ public class MiniDFSCluster {
|
|
// Start the DataNodes
|
|
// Start the DataNodes
|
|
startDataNodes(conf, numDataNodes, storageTypes, manageDataDfsDirs,
|
|
startDataNodes(conf, numDataNodes, storageTypes, manageDataDfsDirs,
|
|
dnStartOpt != null ? dnStartOpt : startOpt,
|
|
dnStartOpt != null ? dnStartOpt : startOpt,
|
|
- racks, hosts, storageCapacities, simulatedCapacities, setupHostsFile,
|
|
|
|
|
|
+ racks, hosts, simulatedCapacities, setupHostsFile,
|
|
checkDataNodeAddrConfig, checkDataNodeHostConfig, dnConfOverlays);
|
|
checkDataNodeAddrConfig, checkDataNodeHostConfig, dnConfOverlays);
|
|
waitClusterUp();
|
|
waitClusterUp();
|
|
//make sure ProxyUsers uses the latest conf
|
|
//make sure ProxyUsers uses the latest conf
|
|
@@ -1185,8 +1121,8 @@ public class MiniDFSCluster {
|
|
|
|
|
|
String makeDataNodeDirs(int dnIndex, StorageType[] storageTypes) throws IOException {
|
|
String makeDataNodeDirs(int dnIndex, StorageType[] storageTypes) throws IOException {
|
|
StringBuilder sb = new StringBuilder();
|
|
StringBuilder sb = new StringBuilder();
|
|
- assert storageTypes == null || storageTypes.length == storagesPerDatanode;
|
|
|
|
- for (int j = 0; j < storagesPerDatanode; ++j) {
|
|
|
|
|
|
+ assert storageTypes == null || storageTypes.length == DIRS_PER_DATANODE;
|
|
|
|
+ for (int j = 0; j < DIRS_PER_DATANODE; ++j) {
|
|
File dir = getInstanceStorageDir(dnIndex, j);
|
|
File dir = getInstanceStorageDir(dnIndex, j);
|
|
dir.mkdirs();
|
|
dir.mkdirs();
|
|
if (!dir.isDirectory()) {
|
|
if (!dir.isDirectory()) {
|
|
@@ -1262,7 +1198,7 @@ public class MiniDFSCluster {
|
|
long[] simulatedCapacities,
|
|
long[] simulatedCapacities,
|
|
boolean setupHostsFile) throws IOException {
|
|
boolean setupHostsFile) throws IOException {
|
|
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts,
|
|
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts,
|
|
- null, simulatedCapacities, setupHostsFile, false, false, null);
|
|
|
|
|
|
+ simulatedCapacities, setupHostsFile, false, false, null);
|
|
}
|
|
}
|
|
|
|
|
|
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
|
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
|
@@ -1272,7 +1208,7 @@ public class MiniDFSCluster {
|
|
boolean setupHostsFile,
|
|
boolean setupHostsFile,
|
|
boolean checkDataNodeAddrConfig) throws IOException {
|
|
boolean checkDataNodeAddrConfig) throws IOException {
|
|
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts,
|
|
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts,
|
|
- null, simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false, null);
|
|
|
|
|
|
+ simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false, null);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1306,15 +1242,12 @@ public class MiniDFSCluster {
|
|
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
|
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
|
StorageType[][] storageTypes, boolean manageDfsDirs, StartupOption operation,
|
|
StorageType[][] storageTypes, boolean manageDfsDirs, StartupOption operation,
|
|
String[] racks, String[] hosts,
|
|
String[] racks, String[] hosts,
|
|
- long[][] storageCapacities,
|
|
|
|
long[] simulatedCapacities,
|
|
long[] simulatedCapacities,
|
|
boolean setupHostsFile,
|
|
boolean setupHostsFile,
|
|
boolean checkDataNodeAddrConfig,
|
|
boolean checkDataNodeAddrConfig,
|
|
boolean checkDataNodeHostConfig,
|
|
boolean checkDataNodeHostConfig,
|
|
Configuration[] dnConfOverlays) throws IOException {
|
|
Configuration[] dnConfOverlays) throws IOException {
|
|
- assert storageCapacities == null || simulatedCapacities == null;
|
|
|
|
assert storageTypes == null || storageTypes.length == numDataNodes;
|
|
assert storageTypes == null || storageTypes.length == numDataNodes;
|
|
- assert storageCapacities == null || storageCapacities.length == numDataNodes;
|
|
|
|
|
|
|
|
if (operation == StartupOption.RECOVER) {
|
|
if (operation == StartupOption.RECOVER) {
|
|
return;
|
|
return;
|
|
@@ -1367,7 +1300,7 @@ public class MiniDFSCluster {
|
|
operation != StartupOption.ROLLBACK) ?
|
|
operation != StartupOption.ROLLBACK) ?
|
|
null : new String[] {operation.getName()};
|
|
null : new String[] {operation.getName()};
|
|
|
|
|
|
- DataNode[] dns = new DataNode[numDataNodes];
|
|
|
|
|
|
+
|
|
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
|
|
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
|
|
Configuration dnConf = new HdfsConfiguration(conf);
|
|
Configuration dnConf = new HdfsConfiguration(conf);
|
|
if (dnConfOverlays != null) {
|
|
if (dnConfOverlays != null) {
|
|
@@ -1458,24 +1391,10 @@ public class MiniDFSCluster {
|
|
dn.runDatanodeDaemon();
|
|
dn.runDatanodeDaemon();
|
|
dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs,
|
|
dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs,
|
|
secureResources, dn.getIpcPort()));
|
|
secureResources, dn.getIpcPort()));
|
|
- dns[i - curDatanodesNum] = dn;
|
|
|
|
}
|
|
}
|
|
curDatanodesNum += numDataNodes;
|
|
curDatanodesNum += numDataNodes;
|
|
this.numDataNodes += numDataNodes;
|
|
this.numDataNodes += numDataNodes;
|
|
waitActive();
|
|
waitActive();
|
|
-
|
|
|
|
- if (storageCapacities != null) {
|
|
|
|
- for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) {
|
|
|
|
- List<? extends FsVolumeSpi> volumes = dns[i].getFSDataset().getVolumes();
|
|
|
|
- assert storageCapacities[i].length == storagesPerDatanode;
|
|
|
|
- assert volumes.size() == storagesPerDatanode;
|
|
|
|
-
|
|
|
|
- for (int j = 0; j < volumes.size(); ++j) {
|
|
|
|
- FsVolumeImpl volume = (FsVolumeImpl) volumes.get(j);
|
|
|
|
- volume.setCapacityForTesting(storageCapacities[i][j]);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|