|
@@ -55,6 +55,7 @@ import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
@@ -172,43 +173,99 @@ public class DataStorage extends Storage {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Analyze storage directories.
|
|
|
- * Recover from previous transitions if required.
|
|
|
- * Perform fs state transition if necessary depending on the namespace info.
|
|
|
- * Read storage info.
|
|
|
- * <br>
|
|
|
- * This method should be synchronized between multiple DN threads. Only the
|
|
|
- * first DN thread does DN level storage dir recoverTransitionRead.
|
|
|
- *
|
|
|
+ * {{@inheritDoc org.apache.hadoop.hdfs.server.common.Storage#writeAll()}}
|
|
|
+ */
|
|
|
+ private void writeAll(Collection<StorageDirectory> dirs) throws IOException {
|
|
|
+ this.layoutVersion = getServiceLayoutVersion();
|
|
|
+ for (StorageDirectory dir : dirs) {
|
|
|
+ writeProperties(dir);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Add a list of volumes to be managed by DataStorage. If the volume is empty,
|
|
|
+ * format it, otherwise recover it from previous transitions if required.
|
|
|
+ *
|
|
|
+ * @param datanode the reference to DataNode.
|
|
|
* @param nsInfo namespace information
|
|
|
* @param dataDirs array of data storage directories
|
|
|
* @param startOpt startup option
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- synchronized void recoverTransitionRead(DataNode datanode,
|
|
|
+ synchronized void addStorageLocations(DataNode datanode,
|
|
|
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
|
|
|
StartupOption startOpt)
|
|
|
throws IOException {
|
|
|
- if (initialized) {
|
|
|
- // DN storage has been initialized, no need to do anything
|
|
|
- return;
|
|
|
+ // Similar to recoverTransitionRead, it first ensures the datanode level
|
|
|
+ // format is completed.
|
|
|
+ List<StorageLocation> tmpDataDirs =
|
|
|
+ new ArrayList<StorageLocation>(dataDirs);
|
|
|
+ addStorageLocations(datanode, nsInfo, tmpDataDirs, startOpt, false, true);
|
|
|
+
|
|
|
+ Collection<File> bpDataDirs = new ArrayList<File>();
|
|
|
+ String bpid = nsInfo.getBlockPoolID();
|
|
|
+ for (StorageLocation dir : dataDirs) {
|
|
|
+ File dnRoot = dir.getFile();
|
|
|
+ File bpRoot = BlockPoolSliceStorage.getBpRoot(bpid, new File(dnRoot,
|
|
|
+ STORAGE_DIR_CURRENT));
|
|
|
+ bpDataDirs.add(bpRoot);
|
|
|
}
|
|
|
- LOG.info("Data-node version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
|
|
|
- + " and name-node layout version: " + nsInfo.getLayoutVersion());
|
|
|
-
|
|
|
- // 1. For each data directory calculate its state and
|
|
|
- // check whether all is consistent before transitioning.
|
|
|
- // Format and recover.
|
|
|
- this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
|
|
|
- ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size());
|
|
|
+ // mkdir for the list of BlockPoolStorage
|
|
|
+ makeBlockPoolDataDir(bpDataDirs, null);
|
|
|
+ BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid);
|
|
|
+ if (bpStorage == null) {
|
|
|
+ bpStorage = new BlockPoolSliceStorage(
|
|
|
+ nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(),
|
|
|
+ nsInfo.getClusterID());
|
|
|
+ }
|
|
|
+
|
|
|
+ bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
|
|
|
+ addBlockPoolStorage(bpid, bpStorage);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Add a list of volumes to be managed by this DataStorage. If the volume is
|
|
|
+ * empty, it formats the volume, otherwise it recovers it from previous
|
|
|
+ * transitions if required.
|
|
|
+ *
|
|
|
+ * If isInitialize is false, only the directories that have finished the
|
|
|
+ * doTransition() process will be added into DataStorage.
|
|
|
+ *
|
|
|
+ * @param datanode the reference to DataNode.
|
|
|
+ * @param nsInfo namespace information
|
|
|
+ * @param dataDirs array of data storage directories
|
|
|
+ * @param startOpt startup option
|
|
|
+ * @param isInitialize whether it is called when DataNode starts up.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private synchronized void addStorageLocations(DataNode datanode,
|
|
|
+ NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
|
|
|
+ StartupOption startOpt, boolean isInitialize, boolean ignoreExistingDirs)
|
|
|
+ throws IOException {
|
|
|
+ Set<String> existingStorageDirs = new HashSet<String>();
|
|
|
+ for (int i = 0; i < getNumStorageDirs(); i++) {
|
|
|
+ existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 1. For each data directory calculate its state and check whether all is
|
|
|
+ // consistent before transitioning. Format and recover.
|
|
|
+ ArrayList<StorageState> dataDirStates =
|
|
|
+ new ArrayList<StorageState>(dataDirs.size());
|
|
|
+ List<StorageDirectory> addedStorageDirectories =
|
|
|
+ new ArrayList<StorageDirectory>();
|
|
|
for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) {
|
|
|
File dataDir = it.next().getFile();
|
|
|
+ if (existingStorageDirs.contains(dataDir.getAbsolutePath())) {
|
|
|
+ LOG.info("Storage directory " + dataDir + " has already been used.");
|
|
|
+ it.remove();
|
|
|
+ continue;
|
|
|
+ }
|
|
|
StorageDirectory sd = new StorageDirectory(dataDir);
|
|
|
StorageState curState;
|
|
|
try {
|
|
|
curState = sd.analyzeStorage(startOpt, this);
|
|
|
// sd is locked but not opened
|
|
|
- switch(curState) {
|
|
|
+ switch (curState) {
|
|
|
case NORMAL:
|
|
|
break;
|
|
|
case NON_EXISTENT:
|
|
@@ -217,7 +274,8 @@ public class DataStorage extends Storage {
|
|
|
it.remove();
|
|
|
continue;
|
|
|
case NOT_FORMATTED: // format
|
|
|
- LOG.info("Storage directory " + dataDir + " is not formatted");
|
|
|
+ LOG.info("Storage directory " + dataDir + " is not formatted for "
|
|
|
+ + nsInfo.getBlockPoolID());
|
|
|
LOG.info("Formatting ...");
|
|
|
format(sd, nsInfo, datanode.getDatanodeUuid());
|
|
|
break;
|
|
@@ -231,33 +289,82 @@ public class DataStorage extends Storage {
|
|
|
//continue with other good dirs
|
|
|
continue;
|
|
|
}
|
|
|
- // add to the storage list
|
|
|
- addStorageDir(sd);
|
|
|
+ if (isInitialize) {
|
|
|
+ addStorageDir(sd);
|
|
|
+ }
|
|
|
+ addedStorageDirectories.add(sd);
|
|
|
dataDirStates.add(curState);
|
|
|
}
|
|
|
|
|
|
- if (dataDirs.size() == 0 || dataDirStates.size() == 0) // none of the data dirs exist
|
|
|
+ if (dataDirs.size() == 0 || dataDirStates.size() == 0) {
|
|
|
+ // none of the data dirs exist
|
|
|
+ if (ignoreExistingDirs) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
throw new IOException(
|
|
|
"All specified directories are not accessible or do not exist.");
|
|
|
+ }
|
|
|
|
|
|
// 2. Do transitions
|
|
|
// Each storage directory is treated individually.
|
|
|
- // During startup some of them can upgrade or rollback
|
|
|
- // while others could be uptodate for the regular startup.
|
|
|
- try {
|
|
|
- for (int idx = 0; idx < getNumStorageDirs(); idx++) {
|
|
|
- doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
|
|
|
- createStorageID(getStorageDir(idx));
|
|
|
+ // During startup some of them can upgrade or rollback
|
|
|
+ // while others could be up-to-date for the regular startup.
|
|
|
+ for (Iterator<StorageDirectory> it = addedStorageDirectories.iterator();
|
|
|
+ it.hasNext(); ) {
|
|
|
+ StorageDirectory sd = it.next();
|
|
|
+ try {
|
|
|
+ doTransition(datanode, sd, nsInfo, startOpt);
|
|
|
+ createStorageID(sd);
|
|
|
+ } catch (IOException e) {
|
|
|
+ if (!isInitialize) {
|
|
|
+ sd.unlock();
|
|
|
+ it.remove();
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ unlockAll();
|
|
|
+ throw e;
|
|
|
}
|
|
|
- } catch (IOException e) {
|
|
|
- unlockAll();
|
|
|
- throw e;
|
|
|
}
|
|
|
|
|
|
- // 3. Update all storages. Some of them might have just been formatted.
|
|
|
- this.writeAll();
|
|
|
+ // 3. Update all successfully loaded storages. Some of them might have just
|
|
|
+ // been formatted.
|
|
|
+ this.writeAll(addedStorageDirectories);
|
|
|
+
|
|
|
+ // 4. Make newly loaded storage directories visible for service.
|
|
|
+ if (!isInitialize) {
|
|
|
+ this.storageDirs.addAll(addedStorageDirectories);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Analyze storage directories.
|
|
|
+ * Recover from previous transitions if required.
|
|
|
+ * Perform fs state transition if necessary depending on the namespace info.
|
|
|
+ * Read storage info.
|
|
|
+ * <br>
|
|
|
+ * This method should be synchronized between multiple DN threads. Only the
|
|
|
+ * first DN thread does DN level storage dir recoverTransitionRead.
|
|
|
+ *
|
|
|
+ * @param nsInfo namespace information
|
|
|
+ * @param dataDirs array of data storage directories
|
|
|
+ * @param startOpt startup option
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ synchronized void recoverTransitionRead(DataNode datanode,
|
|
|
+ NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
|
|
|
+ StartupOption startOpt)
|
|
|
+ throws IOException {
|
|
|
+ if (initialized) {
|
|
|
+ // DN storage has been initialized, no need to do anything
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
|
|
|
+ + " and NameNode layout version: " + nsInfo.getLayoutVersion());
|
|
|
+
|
|
|
+ this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
|
|
|
+ addStorageLocations(datanode, nsInfo, dataDirs, startOpt, true, false);
|
|
|
|
|
|
- // 4. mark DN storage is initialized
|
|
|
+ // mark DN storage is initialized
|
|
|
this.initialized = true;
|
|
|
}
|
|
|
|