|
@@ -28,6 +28,7 @@ import java.io.FileInputStream;
|
|
import java.io.FileOutputStream;
|
|
import java.io.FileOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.RandomAccessFile;
|
|
import java.io.RandomAccessFile;
|
|
|
|
+import java.net.URI;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.ByteBuffer;
|
|
import java.text.SimpleDateFormat;
|
|
import java.text.SimpleDateFormat;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
@@ -57,6 +58,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
|
|
import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
|
|
import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
|
|
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
|
|
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
|
@@ -127,12 +129,13 @@ public class FSImage extends Storage {
|
|
* list of failed (and thus removed) storages
|
|
* list of failed (and thus removed) storages
|
|
*/
|
|
*/
|
|
protected List<StorageDirectory> removedStorageDirs = new ArrayList<StorageDirectory>();
|
|
protected List<StorageDirectory> removedStorageDirs = new ArrayList<StorageDirectory>();
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
- * Directories for importing an image from a checkpoint.
|
|
|
|
|
|
+ * URIs for importing an image from a checkpoint. In the default case,
|
|
|
|
+ * URIs will represent directories.
|
|
*/
|
|
*/
|
|
- private Collection<File> checkpointDirs;
|
|
|
|
- private Collection<File> checkpointEditsDirs;
|
|
|
|
|
|
+ private Collection<URI> checkpointDirs;
|
|
|
|
+ private Collection<URI> checkpointEditsDirs;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Can fs-image be rolled?
|
|
* Can fs-image be rolled?
|
|
@@ -158,8 +161,10 @@ public class FSImage extends Storage {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
|
|
+ * @throws IOException
|
|
*/
|
|
*/
|
|
- FSImage(Collection<File> fsDirs, Collection<File> fsEditsDirs) {
|
|
|
|
|
|
+ FSImage(Collection<URI> fsDirs, Collection<URI> fsEditsDirs)
|
|
|
|
+ throws IOException {
|
|
this();
|
|
this();
|
|
setStorageDirectories(fsDirs, fsEditsDirs);
|
|
setStorageDirectories(fsDirs, fsEditsDirs);
|
|
}
|
|
}
|
|
@@ -170,11 +175,12 @@ public class FSImage extends Storage {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Represents an Image (image and edit file).
|
|
* Represents an Image (image and edit file).
|
|
|
|
+ * @throws IOException
|
|
*/
|
|
*/
|
|
- FSImage(File imageDir) {
|
|
|
|
|
|
+ FSImage(URI imageDir) throws IOException {
|
|
this();
|
|
this();
|
|
- ArrayList<File> dirs = new ArrayList<File>(1);
|
|
|
|
- ArrayList<File> editsDirs = new ArrayList<File>(1);
|
|
|
|
|
|
+ ArrayList<URI> dirs = new ArrayList<URI>(1);
|
|
|
|
+ ArrayList<URI> editsDirs = new ArrayList<URI>(1);
|
|
dirs.add(imageDir);
|
|
dirs.add(imageDir);
|
|
editsDirs.add(imageDir);
|
|
editsDirs.add(imageDir);
|
|
setStorageDirectories(dirs, editsDirs);
|
|
setStorageDirectories(dirs, editsDirs);
|
|
@@ -197,14 +203,16 @@ public class FSImage extends Storage {
|
|
return restoreFailedStorage;
|
|
return restoreFailedStorage;
|
|
}
|
|
}
|
|
|
|
|
|
- void setStorageDirectories(Collection<File> fsNameDirs,
|
|
|
|
- Collection<File> fsEditsDirs) {
|
|
|
|
|
|
+ void setStorageDirectories(Collection<URI> fsNameDirs,
|
|
|
|
+ Collection<URI> fsEditsDirs) throws IOException {
|
|
this.storageDirs = new ArrayList<StorageDirectory>();
|
|
this.storageDirs = new ArrayList<StorageDirectory>();
|
|
this.removedStorageDirs = new ArrayList<StorageDirectory>();
|
|
this.removedStorageDirs = new ArrayList<StorageDirectory>();
|
|
|
|
+
|
|
// Add all name dirs with appropriate NameNodeDirType
|
|
// Add all name dirs with appropriate NameNodeDirType
|
|
- for (File dirName : fsNameDirs) {
|
|
|
|
|
|
+ for (URI dirName : fsNameDirs) {
|
|
|
|
+ checkSchemeConsistency(dirName);
|
|
boolean isAlsoEdits = false;
|
|
boolean isAlsoEdits = false;
|
|
- for (File editsDirName : fsEditsDirs) {
|
|
|
|
|
|
+ for (URI editsDirName : fsEditsDirs) {
|
|
if (editsDirName.compareTo(dirName) == 0) {
|
|
if (editsDirName.compareTo(dirName) == 0) {
|
|
isAlsoEdits = true;
|
|
isAlsoEdits = true;
|
|
fsEditsDirs.remove(editsDirName);
|
|
fsEditsDirs.remove(editsDirName);
|
|
@@ -214,18 +222,49 @@ public class FSImage extends Storage {
|
|
NameNodeDirType dirType = (isAlsoEdits) ?
|
|
NameNodeDirType dirType = (isAlsoEdits) ?
|
|
NameNodeDirType.IMAGE_AND_EDITS :
|
|
NameNodeDirType.IMAGE_AND_EDITS :
|
|
NameNodeDirType.IMAGE;
|
|
NameNodeDirType.IMAGE;
|
|
- this.addStorageDir(new StorageDirectory(dirName, dirType));
|
|
|
|
|
|
+ // Add to the list of storage directories, only if the
|
|
|
|
+ // URI is of type file://
|
|
|
|
+ if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase())
|
|
|
|
+ == 0){
|
|
|
|
+ this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
|
|
|
|
+ dirType));
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
// Add edits dirs if they are different from name dirs
|
|
// Add edits dirs if they are different from name dirs
|
|
- for (File dirName : fsEditsDirs) {
|
|
|
|
- this.addStorageDir(new StorageDirectory(dirName,
|
|
|
|
|
|
+ for (URI dirName : fsEditsDirs) {
|
|
|
|
+ checkSchemeConsistency(dirName);
|
|
|
|
+ // Add to the list of storage directories, only if the
|
|
|
|
+ // URI is of type file://
|
|
|
|
+ if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase())
|
|
|
|
+ == 0)
|
|
|
|
+ this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
|
|
NameNodeDirType.EDITS));
|
|
NameNodeDirType.EDITS));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- void setCheckpointDirectories(Collection<File> dirs,
|
|
|
|
- Collection<File> editsDirs) {
|
|
|
|
|
|
+ /*
|
|
|
|
+ * Checks the consistency of a URI, in particular if the scheme
|
|
|
|
+ * is specified and is supported by a concrete implementation
|
|
|
|
+ */
|
|
|
|
+ static void checkSchemeConsistency(URI u) throws IOException {
|
|
|
|
+ String scheme = u.getScheme();
|
|
|
|
+ // the URI should have a proper scheme
|
|
|
|
+ if(scheme == null)
|
|
|
|
+ throw new IOException("Undefined scheme for " + u);
|
|
|
|
+ else {
|
|
|
|
+ try {
|
|
|
|
+ // the scheme should be enumerated as JournalType
|
|
|
|
+ JournalType.valueOf(scheme.toUpperCase());
|
|
|
|
+ } catch (IllegalArgumentException iae){
|
|
|
|
+ throw new IOException("Unknown scheme " + scheme +
|
|
|
|
+ ". It should correspond to a JournalType enumeration value");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ void setCheckpointDirectories(Collection<URI> dirs,
|
|
|
|
+ Collection<URI> editsDirs) {
|
|
checkpointDirs = dirs;
|
|
checkpointDirs = dirs;
|
|
checkpointEditsDirs = editsDirs;
|
|
checkpointEditsDirs = editsDirs;
|
|
}
|
|
}
|
|
@@ -235,7 +274,7 @@ public class FSImage extends Storage {
|
|
}
|
|
}
|
|
|
|
|
|
List<StorageDirectory> getRemovedStorageDirs() {
|
|
List<StorageDirectory> getRemovedStorageDirs() {
|
|
- return this.removedStorageDirs;
|
|
|
|
|
|
+ return this.removedStorageDirs;
|
|
}
|
|
}
|
|
|
|
|
|
File getEditFile(StorageDirectory sd) {
|
|
File getEditFile(StorageDirectory sd) {
|
|
@@ -256,21 +295,39 @@ public class FSImage extends Storage {
|
|
return list;
|
|
return list;
|
|
}
|
|
}
|
|
|
|
|
|
- Collection<File> getDirectories(NameNodeDirType dirType) {
|
|
|
|
- ArrayList<File> list = new ArrayList<File>();
|
|
|
|
|
|
+ Collection<URI> getDirectories(NameNodeDirType dirType)
|
|
|
|
+ throws IOException {
|
|
|
|
+ ArrayList<URI> list = new ArrayList<URI>();
|
|
Iterator<StorageDirectory> it = (dirType == null) ? dirIterator() :
|
|
Iterator<StorageDirectory> it = (dirType == null) ? dirIterator() :
|
|
dirIterator(dirType);
|
|
dirIterator(dirType);
|
|
for ( ;it.hasNext(); ) {
|
|
for ( ;it.hasNext(); ) {
|
|
- list.add(it.next().getRoot());
|
|
|
|
|
|
+ StorageDirectory sd = it.next();
|
|
|
|
+ try {
|
|
|
|
+ list.add(new URI("file://" + sd.getRoot().getAbsolutePath()));
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ throw new IOException("Exception while processing " +
|
|
|
|
+ "StorageDirectory " + sd.getRoot().getAbsolutePath() + ". The"
|
|
|
|
+ + " full error message is " + e.getMessage());
|
|
|
|
+ }
|
|
}
|
|
}
|
|
return list;
|
|
return list;
|
|
}
|
|
}
|
|
|
|
|
|
- Collection<File> getImageDirectories() {
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Retrieve current directories of type IMAGE
|
|
|
|
+ * @return Collection of URI representing image directories
|
|
|
|
+ * @throws IOException in case of URI processing error
|
|
|
|
+ */
|
|
|
|
+ Collection<URI> getImageDirectories() throws IOException {
|
|
return getDirectories(NameNodeDirType.IMAGE);
|
|
return getDirectories(NameNodeDirType.IMAGE);
|
|
}
|
|
}
|
|
|
|
|
|
- Collection<File> getEditsDirectories() {
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Retrieve current directories of type EDITS
|
|
|
|
+ * @return Collection of URI representing edits directories
|
|
|
|
+ * @throws IOException in case of URI processing error
|
|
|
|
+ */
|
|
|
|
+ Collection<URI> getEditsDirectories() throws IOException {
|
|
return getDirectories(NameNodeDirType.EDITS);
|
|
return getDirectories(NameNodeDirType.EDITS);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -300,8 +357,8 @@ public class FSImage extends Storage {
|
|
* @throws IOException
|
|
* @throws IOException
|
|
* @return true if the image needs to be saved or false otherwise
|
|
* @return true if the image needs to be saved or false otherwise
|
|
*/
|
|
*/
|
|
- boolean recoverTransitionRead(Collection<File> dataDirs,
|
|
|
|
- Collection<File> editsDirs,
|
|
|
|
|
|
+ boolean recoverTransitionRead(Collection<URI> dataDirs,
|
|
|
|
+ Collection<URI> editsDirs,
|
|
StartupOption startOpt
|
|
StartupOption startOpt
|
|
) throws IOException {
|
|
) throws IOException {
|
|
assert startOpt != StartupOption.FORMAT :
|
|
assert startOpt != StartupOption.FORMAT :
|
|
@@ -740,7 +797,7 @@ public class FSImage extends Storage {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- // if there are some edit log streams to remove
|
|
|
|
|
|
+ // if there are some edit log streams to remove
|
|
if(propagate && al != null)
|
|
if(propagate && al != null)
|
|
editLog.processIOError(al, false);
|
|
editLog.processIOError(al, false);
|
|
|
|
|
|
@@ -1667,7 +1724,7 @@ public class FSImage extends Storage {
|
|
return; //nothing to restore
|
|
return; //nothing to restore
|
|
|
|
|
|
LOG.info("FSImage.attemptRestoreRemovedStorage: check removed(failed) " +
|
|
LOG.info("FSImage.attemptRestoreRemovedStorage: check removed(failed) " +
|
|
- "storarge. removedStorages size = " + removedStorageDirs.size());
|
|
|
|
|
|
+ "storarge. removedStorages size = " + removedStorageDirs.size());
|
|
for(Iterator<StorageDirectory> it = this.removedStorageDirs.iterator(); it.hasNext();) {
|
|
for(Iterator<StorageDirectory> it = this.removedStorageDirs.iterator(); it.hasNext();) {
|
|
StorageDirectory sd = it.next();
|
|
StorageDirectory sd = it.next();
|
|
File root = sd.getRoot();
|
|
File root = sd.getRoot();
|
|
@@ -1823,31 +1880,63 @@ public class FSImage extends Storage {
|
|
+ FSConstants.LAYOUT_VERSION + " is initialized.");
|
|
+ FSConstants.LAYOUT_VERSION + " is initialized.");
|
|
}
|
|
}
|
|
|
|
|
|
- static Collection<File> getCheckpointDirs(Configuration conf,
|
|
|
|
- String defaultName) {
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Retrieve checkpoint dirs from configuration.
|
|
|
|
+ *
|
|
|
|
+ * @param conf, the Configuration
|
|
|
|
+ * @param defaultValue, a default value for the attribute, if null
|
|
|
|
+ * @return a Collection of URIs representing the values in
|
|
|
|
+ * fs.checkpoint.dir configuration property
|
|
|
|
+ */
|
|
|
|
+ static Collection<URI> getCheckpointDirs(Configuration conf,
|
|
|
|
+ String defaultValue) {
|
|
Collection<String> dirNames = conf.getStringCollection("fs.checkpoint.dir");
|
|
Collection<String> dirNames = conf.getStringCollection("fs.checkpoint.dir");
|
|
- if (dirNames.size() == 0 && defaultName != null) {
|
|
|
|
- dirNames.add(defaultName);
|
|
|
|
|
|
+ if (dirNames.size() == 0 && defaultValue != null) {
|
|
|
|
+ dirNames.add(defaultValue);
|
|
}
|
|
}
|
|
- Collection<File> dirs = new ArrayList<File>(dirNames.size());
|
|
|
|
|
|
+ Collection<URI> dirs = new ArrayList<URI>(dirNames.size());
|
|
for(String name : dirNames) {
|
|
for(String name : dirNames) {
|
|
- dirs.add(new File(name));
|
|
|
|
|
|
+ try {
|
|
|
|
+ // process value as URI
|
|
|
|
+ URI u = new URI(name);
|
|
|
|
+ // if scheme is undefined, then assume it's file://
|
|
|
|
+ if(u.getScheme() == null)
|
|
|
|
+ u = new URI("file://" + new File(name).getAbsolutePath());
|
|
|
|
+ // check that scheme is not null (trivial) and supported
|
|
|
|
+ checkSchemeConsistency(u);
|
|
|
|
+ dirs.add(u);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ LOG.error("Error while processing URI: " + name +
|
|
|
|
+ ". The error message was: " + e.getMessage());
|
|
|
|
+ }
|
|
}
|
|
}
|
|
return dirs;
|
|
return dirs;
|
|
}
|
|
}
|
|
|
|
|
|
- static Collection<File> getCheckpointEditsDirs(Configuration conf,
|
|
|
|
- String defaultName) {
|
|
|
|
|
|
+ static Collection<URI> getCheckpointEditsDirs(Configuration conf,
|
|
|
|
+ String defaultName) {
|
|
Collection<String> dirNames =
|
|
Collection<String> dirNames =
|
|
- conf.getStringCollection("fs.checkpoint.edits.dir");
|
|
|
|
- if (dirNames.size() == 0 && defaultName != null) {
|
|
|
|
- dirNames.add(defaultName);
|
|
|
|
- }
|
|
|
|
- Collection<File> dirs = new ArrayList<File>(dirNames.size());
|
|
|
|
- for(String name : dirNames) {
|
|
|
|
- dirs.add(new File(name));
|
|
|
|
- }
|
|
|
|
- return dirs;
|
|
|
|
|
|
+ conf.getStringCollection("fs.checkpoint.edits.dir");
|
|
|
|
+ if (dirNames.size() == 0 && defaultName != null) {
|
|
|
|
+ dirNames.add(defaultName);
|
|
|
|
+ }
|
|
|
|
+ Collection<URI> dirs = new ArrayList<URI>(dirNames.size());
|
|
|
|
+ for(String name : dirNames) {
|
|
|
|
+ try {
|
|
|
|
+ // process value as URI
|
|
|
|
+ URI u = new URI(name);
|
|
|
|
+ // if scheme is undefined, then assume it's file://
|
|
|
|
+ if(u.getScheme() == null)
|
|
|
|
+ u = new URI("file://" + new File(name).getAbsolutePath());
|
|
|
|
+ // check that scheme is not null (trivial) and supported
|
|
|
|
+ checkSchemeConsistency(u);
|
|
|
|
+ dirs.add(u);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ LOG.error("Error while processing URI: " + name +
|
|
|
|
+ ". The error message was: " + e.getMessage());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return dirs;
|
|
}
|
|
}
|
|
|
|
|
|
static private final DeprecatedUTF8 U_STR = new DeprecatedUTF8();
|
|
static private final DeprecatedUTF8 U_STR = new DeprecatedUTF8();
|