|
@@ -26,12 +26,14 @@ import java.io.FileInputStream;
|
|
|
import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.RandomAccessFile;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
+import java.nio.channels.FileChannel;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Iterator;
|
|
|
-import java.lang.Math;
|
|
|
-import java.nio.channels.FileChannel;
|
|
|
-import java.nio.ByteBuffer;
|
|
|
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.fs.permission.PermissionStatus;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
@@ -44,9 +46,14 @@ import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
|
|
-import org.apache.hadoop.io.*;
|
|
|
-import org.apache.hadoop.fs.FileStatus;
|
|
|
-import org.apache.hadoop.fs.permission.*;
|
|
|
+import org.apache.hadoop.io.ArrayWritable;
|
|
|
+import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
+import org.apache.hadoop.io.LongWritable;
|
|
|
+import org.apache.hadoop.io.UTF8;
|
|
|
+import org.apache.hadoop.io.Writable;
|
|
|
+import org.apache.hadoop.io.WritableFactories;
|
|
|
+import org.apache.hadoop.io.WritableFactory;
|
|
|
+import org.mortbay.log.Log;
|
|
|
|
|
|
/**
|
|
|
* FSEditLog maintains a log of the namespace modifications.
|
|
@@ -360,20 +367,30 @@ public class FSEditLog {
|
|
|
numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
|
|
|
if (editStreams == null)
|
|
|
editStreams = new ArrayList<EditLogOutputStream>();
|
|
|
+
|
|
|
+ ArrayList<StorageDirectory> al = null;
|
|
|
for (Iterator<StorageDirectory> it =
|
|
|
fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
|
|
|
StorageDirectory sd = it.next();
|
|
|
File eFile = getEditFile(sd);
|
|
|
try {
|
|
|
- EditLogOutputStream eStream = new EditLogFileOutputStream(eFile);
|
|
|
- editStreams.add(eStream);
|
|
|
+ addNewEditLogStream(eFile);
|
|
|
} catch (IOException e) {
|
|
|
FSNamesystem.LOG.warn("Unable to open edit log file " + eFile);
|
|
|
// Remove the directory from list of storage directories
|
|
|
- fsimage.removedStorageDirs.add(sd);
|
|
|
- it.remove();
|
|
|
+ if(al == null) al = new ArrayList<StorageDirectory>(1);
|
|
|
+ al.add(sd);
|
|
|
+
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ if(al != null) fsimage.processIOError(al, false);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public synchronized void addNewEditLogStream(File eFile) throws IOException {
|
|
|
+ EditLogOutputStream eStream = new EditLogFileOutputStream(eFile);
|
|
|
+ editStreams.add(eStream);
|
|
|
}
|
|
|
|
|
|
public synchronized void createEditLogFile(File name) throws IOException {
|
|
@@ -424,7 +441,7 @@ public class FSEditLog {
|
|
|
errorStreams.add(eStream);
|
|
|
}
|
|
|
}
|
|
|
- processIOError(errorStreams);
|
|
|
+ processIOError(errorStreams, true);
|
|
|
editStreams.clear();
|
|
|
}
|
|
|
|
|
@@ -441,80 +458,101 @@ public class FSEditLog {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * If there is an IO Error on any log operations, remove that
|
|
|
- * directory from the list of directories.
|
|
|
- * If no more directories remain, then exit.
|
|
|
+ * The specified streams have IO errors. Close and remove them.
|
|
|
+ * If propagate is true - close related StorageDirectories.
|
|
|
+ * (is called with propagate value true from everywhere
|
|
|
+ * except fsimage.processIOError)
|
|
|
*/
|
|
|
- synchronized void processIOError(int index) {
|
|
|
+ synchronized void processIOError(
|
|
|
+ ArrayList<EditLogOutputStream> errorStreams,
|
|
|
+ boolean propagate) {
|
|
|
+
|
|
|
+ String lsd = fsimage.listStorageDirectories();
|
|
|
+ FSNamesystem.LOG.info("current list of storage dirs:" + lsd);
|
|
|
+
|
|
|
+ if (errorStreams == null || errorStreams.size() == 0) {
|
|
|
+ return; // nothing to do
|
|
|
+ }
|
|
|
+
|
|
|
+ //EditLogOutputStream
|
|
|
if (editStreams == null || editStreams.size() <= 1) {
|
|
|
FSNamesystem.LOG.fatal(
|
|
|
"Fatal Error : All storage directories are inaccessible.");
|
|
|
Runtime.getRuntime().exit(-1);
|
|
|
}
|
|
|
- assert(index < getNumEditStreams());
|
|
|
-
|
|
|
- EditLogOutputStream eStream = editStreams.get(index);
|
|
|
- removeStream(index);
|
|
|
|
|
|
- if(!(eStream instanceof EditLogFileOutputStream))
|
|
|
- return; // non file streams don't have associated storage directories
|
|
|
+ ArrayList<StorageDirectory> al = null;
|
|
|
+ for (EditLogOutputStream eStream : errorStreams) {
|
|
|
+ FSNamesystem.LOG.error("Unable to log edits to " + eStream.getName()
|
|
|
+ + "; removing it");
|
|
|
|
|
|
- EditLogFileOutputStream eFStream = (EditLogFileOutputStream)eStream;
|
|
|
- File parentStorageDir = eFStream.getFile().getParentFile().getParentFile();
|
|
|
- //
|
|
|
- // Invoke the ioerror routine of the fsimage
|
|
|
- //
|
|
|
- fsimage.processIOError(parentStorageDir);
|
|
|
+ StorageDirectory storageDir;
|
|
|
+ if(propagate && eStream.getType() == JournalType.FILE && //find SD
|
|
|
+ (storageDir = getStorage(eStream)) != null) {
|
|
|
+ FSNamesystem.LOG.info("about to remove corresponding storage:"
|
|
|
+ + storageDir.getRoot().getAbsolutePath());
|
|
|
+ // remove corresponding storage dir
|
|
|
+ if(al == null) al = new ArrayList<StorageDirectory>(1);
|
|
|
+ al.add(storageDir);
|
|
|
+ }
|
|
|
+ Iterator<EditLogOutputStream> ies = editStreams.iterator();
|
|
|
+ while (ies.hasNext()) {
|
|
|
+ EditLogOutputStream es = ies.next();
|
|
|
+ if (es == eStream) {
|
|
|
+ try { eStream.close(); } catch (IOException e) {
|
|
|
+ // nothing to do.
|
|
|
+ FSNamesystem.LOG.warn("Failed to close eStream " + eStream.getName()
|
|
|
+ + " before removing it (might be ok)");
|
|
|
+ }
|
|
|
+ ies.remove();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // removed failed SDs
|
|
|
+ if(propagate && al != null) fsimage.processIOError(al, false);
|
|
|
+
|
|
|
+ //for the rest of the streams
|
|
|
+ if(propagate) incrementCheckpointTime();
|
|
|
+
|
|
|
+ lsd = fsimage.listStorageDirectories();
|
|
|
+ FSNamesystem.LOG.info("at the end current list of storage dirs:" + lsd);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
- * If there is an IO Error on any log operations on storage directory,
|
|
|
- * remove any stream associated with that directory
|
|
|
+ * get an editStream corresponding to a sd
|
|
|
+ * @param es - stream to remove
|
|
|
+ * @return the matching stream
|
|
|
*/
|
|
|
- synchronized void processIOError(StorageDirectory sd) {
|
|
|
- // Try to remove stream only if one should exist
|
|
|
- if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
|
|
|
- return;
|
|
|
- if (editStreams == null || editStreams.size() <= 1) {
|
|
|
- FSNamesystem.LOG.fatal(
|
|
|
- "Fatal Error : All storage directories are inaccessible.");
|
|
|
- Runtime.getRuntime().exit(-1);
|
|
|
- }
|
|
|
- for (int idx = 0; idx < editStreams.size(); idx++) {
|
|
|
- File parentStorageDir = ((EditLogFileOutputStream)editStreams
|
|
|
- .get(idx)).getFile()
|
|
|
- .getParentFile().getParentFile();
|
|
|
- if (parentStorageDir.getName().equals(sd.getRoot().getName()))
|
|
|
- removeStream(idx);
|
|
|
+ public StorageDirectory getStorage(EditLogOutputStream es) {
|
|
|
+ String parentStorageDir = ((EditLogFileOutputStream)es).getFile()
|
|
|
+ .getParentFile().getParentFile().getAbsolutePath();
|
|
|
+
|
|
|
+ Iterator<StorageDirectory> it = fsimage.dirIterator();
|
|
|
+ while (it.hasNext()) {
|
|
|
+ StorageDirectory sd = it.next();
|
|
|
+ FSNamesystem.LOG.info("comparing: " + parentStorageDir + " and " + sd.getRoot().getAbsolutePath());
|
|
|
+ if (parentStorageDir.equals(sd.getRoot().getAbsolutePath()))
|
|
|
+ return sd;
|
|
|
}
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * The specified streams have IO errors. Remove them from logging
|
|
|
- * new transactions.
|
|
|
+ * get an editStream corresponding to a sd
|
|
|
+ * @param sd
|
|
|
+ * @return the matching stream
|
|
|
*/
|
|
|
- synchronized void processIOError(ArrayList<EditLogOutputStream> errorStreams) {
|
|
|
- if (errorStreams == null) {
|
|
|
- return; // nothing to do
|
|
|
- }
|
|
|
- for (int idx = 0; idx < errorStreams.size(); idx++) {
|
|
|
- EditLogOutputStream eStream = errorStreams.get(idx);
|
|
|
- int j = 0;
|
|
|
- int numEditStreams = editStreams.size();
|
|
|
- for (j = 0; j < numEditStreams; j++) {
|
|
|
- if (editStreams.get(j) == eStream) {
|
|
|
- FSNamesystem.LOG.error("Unable to log edits to " + eStream.getName());
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- if (j == numEditStreams) {
|
|
|
- FSNamesystem.LOG.error("Unable to find sync log on which " +
|
|
|
- " IO error occured.");
|
|
|
- continue;
|
|
|
- }
|
|
|
- processIOError(j);
|
|
|
- }
|
|
|
- incrementCheckpointTime();
|
|
|
+ public EditLogOutputStream getEditsStream(StorageDirectory sd) {
|
|
|
+ for (EditLogOutputStream es : editStreams) {
|
|
|
+ File parentStorageDir = ((EditLogFileOutputStream)es).getFile()
|
|
|
+ .getParentFile().getParentFile();
|
|
|
+ if (parentStorageDir.getName().equals(sd.getRoot().getName()))
|
|
|
+ return es;
|
|
|
+ }
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -905,6 +943,7 @@ public class FSEditLog {
|
|
|
ArrayList<EditLogOutputStream> errorStreams = null;
|
|
|
long start = FSNamesystem.now();
|
|
|
for(EditLogOutputStream eStream : editStreams) {
|
|
|
+ Log.debug("loggin edits into " + eStream.getName() + " stream");
|
|
|
if(!eStream.isOperationSupported(op))
|
|
|
continue;
|
|
|
try {
|
|
@@ -916,7 +955,7 @@ public class FSEditLog {
|
|
|
errorStreams.add(eStream);
|
|
|
}
|
|
|
}
|
|
|
- processIOError(errorStreams);
|
|
|
+ processIOError(errorStreams, true);
|
|
|
recordTransaction(start);
|
|
|
}
|
|
|
|
|
@@ -1001,7 +1040,7 @@ public class FSEditLog {
|
|
|
long elapsed = FSNamesystem.now() - start;
|
|
|
|
|
|
synchronized (this) {
|
|
|
- processIOError(errorStreams);
|
|
|
+ processIOError(errorStreams, true);
|
|
|
synctxid = syncStart;
|
|
|
isSyncRunning = false;
|
|
|
this.notifyAll();
|
|
@@ -1178,6 +1217,7 @@ public class FSEditLog {
|
|
|
assert getNumEditsDirs() <= getNumEditStreams() :
|
|
|
"Number of edits directories should not exceed the number of streams.";
|
|
|
long size = 0;
|
|
|
+ ArrayList<EditLogOutputStream> al = null;
|
|
|
for (int idx = 0; idx < getNumEditStreams(); idx++) {
|
|
|
EditLogOutputStream es = editStreams.get(idx);
|
|
|
try {
|
|
@@ -1188,11 +1228,21 @@ public class FSEditLog {
|
|
|
} catch (IOException e) {
|
|
|
FSImage.LOG.warn("getEditLogSize: editstream.length failed. removing editlog (" +
|
|
|
idx + ") " + es.getName());
|
|
|
- processIOError(idx);
|
|
|
+ if(al==null) al = new ArrayList<EditLogOutputStream>(1);
|
|
|
+ al.add(es);
|
|
|
}
|
|
|
}
|
|
|
+ if(al!=null) processIOError(al, true);
|
|
|
return size;
|
|
|
}
|
|
|
+
|
|
|
+ public String listEditsStreams() {
|
|
|
+ StringBuffer buf = new StringBuffer();
|
|
|
+ for (EditLogOutputStream os : editStreams) {
|
|
|
+ buf.append(os.getName() + ";");
|
|
|
+ }
|
|
|
+ return buf.toString();
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Closes the current edit log and opens edits.new.
|
|
@@ -1256,7 +1306,7 @@ public class FSEditLog {
|
|
|
errorStreams.add(eStream);
|
|
|
}
|
|
|
}
|
|
|
- processIOError(errorStreams);
|
|
|
+ processIOError(errorStreams, true);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1316,18 +1366,21 @@ public class FSEditLog {
|
|
|
errorStreams.add(eStream);
|
|
|
}
|
|
|
}
|
|
|
- processIOError(errorStreams);
|
|
|
+ processIOError(errorStreams, true);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Return the name of the edit file
|
|
|
*/
|
|
|
synchronized File getFsEditName() {
|
|
|
- StorageDirectory sd = null;
|
|
|
+ StorageDirectory sd = null;
|
|
|
for (Iterator<StorageDirectory> it =
|
|
|
- fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();)
|
|
|
- sd = it.next();
|
|
|
- return getEditFile(sd);
|
|
|
+ fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
|
|
|
+ sd = it.next();
|
|
|
+ if(sd.getRoot().canRead())
|
|
|
+ return getEditFile(sd);
|
|
|
+ }
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1463,7 +1516,7 @@ public class FSEditLog {
|
|
|
errorStreams.add(eStream);
|
|
|
}
|
|
|
}
|
|
|
- processIOError(errorStreams);
|
|
|
+ processIOError(errorStreams, true);
|
|
|
recordTransaction(start);
|
|
|
}
|
|
|
|
|
@@ -1552,7 +1605,7 @@ public class FSEditLog {
|
|
|
}
|
|
|
assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
|
|
|
"Not a backup node corresponds to a backup stream";
|
|
|
- processIOError(errorStreams);
|
|
|
+ processIOError(errorStreams, true);
|
|
|
}
|
|
|
|
|
|
synchronized boolean checkBackupRegistration(
|
|
@@ -1579,7 +1632,7 @@ public class FSEditLog {
|
|
|
}
|
|
|
assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
|
|
|
"Not a backup node corresponds to a backup stream";
|
|
|
- processIOError(errorStreams);
|
|
|
+ processIOError(errorStreams, true);
|
|
|
return regAllowed;
|
|
|
}
|
|
|
}
|