|
@@ -43,10 +43,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
|
|
|
|
|
@@ -152,13 +149,11 @@ import org.apache.hadoop.util.DiskChecker;
|
|
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|
|
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
|
|
import org.apache.hadoop.util.GenericOptionsParser;
|
|
|
-import org.apache.hadoop.util.ReflectionUtils;
|
|
|
import org.apache.hadoop.util.ServicePlugin;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.util.VersionInfo;
|
|
|
import org.mortbay.util.ajax.JSON;
|
|
|
|
|
|
-import com.google.common.base.Preconditions;
|
|
|
|
|
|
|
|
|
/**********************************************************
|
|
@@ -427,13 +422,14 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private synchronized void setClusterId(String cid) throws IOException {
|
|
|
- if(clusterId != null && !clusterId.equals(cid)) {
|
|
|
- throw new IOException ("cluster id doesn't match. old cid=" + clusterId
|
|
|
- + " new cid="+ cid);
|
|
|
+ private synchronized void setClusterId(final String nsCid, final String bpid
|
|
|
+ ) throws IOException {
|
|
|
+ if(clusterId != null && !clusterId.equals(nsCid)) {
|
|
|
+ throw new IOException ("Cluster IDs not matched: dn cid=" + clusterId
|
|
|
+ + " but ns cid="+ nsCid + "; bpid=" + bpid);
|
|
|
}
|
|
|
// else
|
|
|
- clusterId = cid;
|
|
|
+ clusterId = nsCid;
|
|
|
}
|
|
|
|
|
|
private static String getHostName(Configuration config)
|
|
@@ -810,51 +806,22 @@ public class DataNode extends Configured
|
|
|
*/
|
|
|
void initBlockPool(BPOfferService bpos) throws IOException {
|
|
|
NamespaceInfo nsInfo = bpos.getNamespaceInfo();
|
|
|
- Preconditions.checkState(nsInfo != null,
|
|
|
- "Block pool " + bpos + " should have retrieved " +
|
|
|
- "its namespace info before calling initBlockPool.");
|
|
|
+ if (nsInfo == null) {
|
|
|
+ throw new IOException("NamespaceInfo not found: Block pool " + bpos
|
|
|
+ + " should have retrieved namespace info before initBlockPool.");
|
|
|
+ }
|
|
|
|
|
|
- String blockPoolId = nsInfo.getBlockPoolID();
|
|
|
-
|
|
|
// Register the new block pool with the BP manager.
|
|
|
blockPoolManager.addBlockPool(bpos);
|
|
|
|
|
|
- synchronized (this) {
|
|
|
- // we do not allow namenode from different cluster to register
|
|
|
- if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) {
|
|
|
- throw new IOException(
|
|
|
- "cannot register with the namenode because clusterid do not match:"
|
|
|
- + " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + nsInfo.clusterID +
|
|
|
- ";dn cid=" + clusterId);
|
|
|
- }
|
|
|
-
|
|
|
- setClusterId(nsInfo.clusterID);
|
|
|
- }
|
|
|
-
|
|
|
- StartupOption startOpt = getStartupOption(conf);
|
|
|
- assert startOpt != null : "Startup option must be set.";
|
|
|
-
|
|
|
- boolean simulatedFSDataset = conf.getBoolean(
|
|
|
- DFS_DATANODE_SIMULATEDDATASTORAGE_KEY,
|
|
|
- DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT);
|
|
|
-
|
|
|
- if (!simulatedFSDataset) {
|
|
|
- // read storage info, lock data dirs and transition fs state if necessary
|
|
|
- storage.recoverTransitionRead(DataNode.this, blockPoolId, nsInfo,
|
|
|
- dataDirs, startOpt);
|
|
|
- StorageInfo bpStorage = storage.getBPStorage(blockPoolId);
|
|
|
- LOG.info("setting up storage: nsid=" +
|
|
|
- bpStorage.getNamespaceID() + ";bpid="
|
|
|
- + blockPoolId + ";lv=" + storage.getLayoutVersion() +
|
|
|
- ";nsInfo=" + nsInfo);
|
|
|
- }
|
|
|
+ setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
|
|
|
|
|
|
// In the case that this is the first block pool to connect, initialize
|
|
|
// the dataset, block scanners, etc.
|
|
|
- initFsDataSet();
|
|
|
+ initStorage(nsInfo);
|
|
|
initPeriodicScanners(conf);
|
|
|
|
|
|
- data.addBlockPool(blockPoolId, conf);
|
|
|
+ data.addBlockPool(nsInfo.getBlockPoolID(), conf);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -881,31 +848,28 @@ public class DataNode extends Configured
|
|
|
* Initializes the {@link #data}. The initialization is done only once, when
|
|
|
* handshake with the the first namenode is completed.
|
|
|
*/
|
|
|
- private synchronized void initFsDataSet() throws IOException {
|
|
|
- if (data != null) { // Already initialized
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // get version and id info from the name-node
|
|
|
- boolean simulatedFSDataset = conf.getBoolean(
|
|
|
- DFS_DATANODE_SIMULATEDDATASTORAGE_KEY,
|
|
|
- DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT);
|
|
|
-
|
|
|
- if (simulatedFSDataset) {
|
|
|
- storage.createStorageID(getPort());
|
|
|
- // it would have been better to pass storage as a parameter to
|
|
|
- // constructor below - need to augment ReflectionUtils used below.
|
|
|
- conf.set(DFS_DATANODE_STORAGEID_KEY, getStorageId());
|
|
|
- try {
|
|
|
- data = (FSDatasetInterface) ReflectionUtils.newInstance(
|
|
|
- Class.forName(
|
|
|
- "org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"),
|
|
|
- conf);
|
|
|
- } catch (ClassNotFoundException e) {
|
|
|
- throw new IOException(StringUtils.stringifyException(e));
|
|
|
+ private void initStorage(final NamespaceInfo nsInfo) throws IOException {
|
|
|
+ final FSDatasetInterface.Factory factory
|
|
|
+ = FSDatasetInterface.Factory.getFactory(conf);
|
|
|
+
|
|
|
+ if (!factory.isSimulated()) {
|
|
|
+ final StartupOption startOpt = getStartupOption(conf);
|
|
|
+ if (startOpt == null) {
|
|
|
+ throw new IOException("Startup option not set.");
|
|
|
+ }
|
|
|
+ final String bpid = nsInfo.getBlockPoolID();
|
|
|
+ //read storage info, lock data dirs and transition fs state if necessary
|
|
|
+ storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt);
|
|
|
+ final StorageInfo bpStorage = storage.getBPStorage(bpid);
|
|
|
+ LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID()
|
|
|
+ + ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion()
|
|
|
+ + ";nsInfo=" + nsInfo);
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized(this) {
|
|
|
+ if (data == null) {
|
|
|
+ data = factory.createFSDatasetInterface(this, storage, conf);
|
|
|
}
|
|
|
- } else {
|
|
|
- data = new FSDataset(this, storage, conf);
|
|
|
}
|
|
|
}
|
|
|
|