|
@@ -40,6 +40,7 @@ import java.text.SimpleDateFormat;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.Date;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
@@ -179,7 +180,13 @@ public class FSImage extends Storage {
|
|
|
/**
|
|
|
* Used for saving the image to disk
|
|
|
*/
|
|
|
- static private final FsPermission FILE_PERM = new FsPermission((short)0);
|
|
|
+ static private final ThreadLocal<FsPermission> FILE_PERM =
|
|
|
+ new ThreadLocal<FsPermission>() {
|
|
|
+ @Override
|
|
|
+ protected FsPermission initialValue() {
|
|
|
+ return new FsPermission((short) 0);
|
|
|
+ }
|
|
|
+ };
|
|
|
static private final byte[] PATH_SEPARATOR = DFSUtil.string2Bytes(Path.SEPARATOR);
|
|
|
|
|
|
private static final Random R = new Random();
|
|
@@ -909,30 +916,34 @@ public class FSImage extends Storage {
|
|
|
* @param propagate - flag, if set - then call corresponding EditLog stream's
|
|
|
* processIOError function.
|
|
|
*/
|
|
|
- void processIOError(ArrayList<StorageDirectory> sds, boolean propagate) {
|
|
|
+ void processIOError(List<StorageDirectory> sds, boolean propagate) {
|
|
|
ArrayList<EditLogOutputStream> al = null;
|
|
|
- for(StorageDirectory sd:sds) {
|
|
|
- // if has a stream assosiated with it - remove it too..
|
|
|
- if (propagate && sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
|
|
|
- EditLogOutputStream eStream = editLog.getEditsStream(sd);
|
|
|
- if(al == null) al = new ArrayList<EditLogOutputStream>(1);
|
|
|
- al.add(eStream);
|
|
|
- }
|
|
|
-
|
|
|
- for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
|
|
|
- StorageDirectory sd1 = it.next();
|
|
|
- if (sd.equals(sd1)) {
|
|
|
- //add storage to the removed list
|
|
|
- LOG.warn("FSImage:processIOError: removing storage: "
|
|
|
- + sd.getRoot().getPath());
|
|
|
- try {
|
|
|
- sd1.unlock(); //unlock before removing (in case it will be restored)
|
|
|
- } catch (Exception e) {
|
|
|
- // nothing
|
|
|
+ synchronized (sds) {
|
|
|
+ for (StorageDirectory sd : sds) {
|
|
|
+ // if has a stream assosiated with it - remove it too..
|
|
|
+ if (propagate && sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
|
|
|
+ EditLogOutputStream eStream = editLog.getEditsStream(sd);
|
|
|
+ if (al == null)
|
|
|
+ al = new ArrayList<EditLogOutputStream>(1);
|
|
|
+ al.add(eStream);
|
|
|
+ }
|
|
|
+
|
|
|
+ for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
|
|
|
+ StorageDirectory sd1 = it.next();
|
|
|
+ if (sd.equals(sd1)) {
|
|
|
+ // add storage to the removed list
|
|
|
+ LOG.warn("FSImage:processIOError: removing storage: "
|
|
|
+ + sd.getRoot().getPath());
|
|
|
+ try {
|
|
|
+ sd1.unlock(); // unlock before removing (in case it will be
|
|
|
+ // restored)
|
|
|
+ } catch (Exception e) {
|
|
|
+ // nothing
|
|
|
+ }
|
|
|
+ removedStorageDirs.add(sd1);
|
|
|
+ it.remove();
|
|
|
+ break;
|
|
|
}
|
|
|
- removedStorageDirs.add(sd1);
|
|
|
- it.remove();
|
|
|
- break;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1468,6 +1479,53 @@ public class FSImage extends Storage {
|
|
|
public void setImageDigest(MD5Hash digest) {
|
|
|
this.imageDigest = digest;
|
|
|
}
|
|
|
+ /**
|
|
|
+ * FSImageSaver is being run in a separate thread when saving
|
|
|
+ * FSImage. There is one thread per each copy of the image.
|
|
|
+ *
|
|
|
+ * FSImageSaver assumes that it was launched from a thread that holds
|
|
|
+ * FSNamesystem lock and waits for the execution of FSImageSaver thread
|
|
|
+ * to finish.
|
|
|
+ * This way we are guraranteed that the namespace is not being updated
|
|
|
+ * while multiple instances of FSImageSaver are traversing it
|
|
|
+ * and writing it out.
|
|
|
+ */
|
|
|
+ private class FSImageSaver implements Runnable {
|
|
|
+ private StorageDirectory sd;
|
|
|
+ private List<StorageDirectory> errorSDs;
|
|
|
+
|
|
|
+ FSImageSaver(StorageDirectory sd, List<StorageDirectory> errorSDs) {
|
|
|
+ this.sd = sd;
|
|
|
+ this.errorSDs = errorSDs;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ saveCurrent(sd);
|
|
|
+ } catch (IOException ie) {
|
|
|
+ LOG.error("Unable to save image for " + sd.getRoot(), ie);
|
|
|
+ errorSDs.add(sd);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public String toString() {
|
|
|
+ return "FSImageSaver for " + sd.getRoot() +
|
|
|
+ " of type " + sd.getStorageDirType();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void waitForThreads(List<Thread> threads) {
|
|
|
+ for (Thread thread : threads) {
|
|
|
+ while (thread.isAlive()) {
|
|
|
+ try {
|
|
|
+ thread.join();
|
|
|
+ } catch (InterruptedException iex) {
|
|
|
+ LOG.error("Caught exception while waiting for thread " +
|
|
|
+ thread.getName() + " to finish. Retrying join");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
/**
|
|
|
* Save the contents of the FS image and create empty edits.
|
|
|
*
|
|
@@ -1488,7 +1546,8 @@ public class FSImage extends Storage {
|
|
|
editLog.close();
|
|
|
if(renewCheckpointTime)
|
|
|
this.checkpointTime = now();
|
|
|
- ArrayList<StorageDirectory> errorSDs = new ArrayList<StorageDirectory>();
|
|
|
+ List<StorageDirectory> errorSDs =
|
|
|
+ Collections.synchronizedList(new ArrayList<StorageDirectory>());
|
|
|
|
|
|
// mv current -> lastcheckpoint.tmp
|
|
|
for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
|
|
@@ -1501,17 +1560,18 @@ public class FSImage extends Storage {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ List<Thread> saveThreads = new ArrayList<Thread>();
|
|
|
// save images into current
|
|
|
for (Iterator<StorageDirectory> it = dirIterator(NameNodeDirType.IMAGE);
|
|
|
it.hasNext();) {
|
|
|
StorageDirectory sd = it.next();
|
|
|
- try {
|
|
|
- saveCurrent(sd);
|
|
|
- } catch(IOException ie) {
|
|
|
- LOG.error("Unable to save image for " + sd.getRoot(), ie);
|
|
|
- errorSDs.add(sd);
|
|
|
- }
|
|
|
+ FSImageSaver saver = new FSImageSaver(sd, errorSDs);
|
|
|
+ Thread saveThread = new Thread(saver, saver.toString());
|
|
|
+ saveThreads.add(saveThread);
|
|
|
+ saveThread.start();
|
|
|
}
|
|
|
+ waitForThreads(saveThreads);
|
|
|
+ saveThreads.clear();
|
|
|
|
|
|
// -NOTE-
|
|
|
// If NN has image-only and edits-only storage directories and fails here
|
|
@@ -1522,18 +1582,17 @@ public class FSImage extends Storage {
|
|
|
// to the old state contained in their lastcheckpoint.tmp.
|
|
|
// The edits directories should be discarded during startup because their
|
|
|
// checkpointTime is older than that of image directories.
|
|
|
-
|
|
|
// recreate edits in current
|
|
|
for (Iterator<StorageDirectory> it = dirIterator(NameNodeDirType.EDITS);
|
|
|
it.hasNext();) {
|
|
|
- StorageDirectory sd = it.next();
|
|
|
- try {
|
|
|
- saveCurrent(sd);
|
|
|
- } catch(IOException ie) {
|
|
|
- LOG.error("Unable to save edits for " + sd.getRoot(), ie);
|
|
|
- errorSDs.add(sd);
|
|
|
- }
|
|
|
+ final StorageDirectory sd = it.next();
|
|
|
+ FSImageSaver saver = new FSImageSaver(sd, errorSDs);
|
|
|
+ Thread saveThread = new Thread(saver, saver.toString());
|
|
|
+ saveThreads.add(saveThread);
|
|
|
+ saveThread.start();
|
|
|
}
|
|
|
+ waitForThreads(saveThreads);
|
|
|
+
|
|
|
// mv lastcheckpoint.tmp -> previous.checkpoint
|
|
|
for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
|
|
|
StorageDirectory sd = it.next();
|
|
@@ -1711,6 +1770,7 @@ public class FSImage extends Storage {
|
|
|
int nameLen = name.position();
|
|
|
out.writeShort(nameLen);
|
|
|
out.write(name.array(), name.arrayOffset(), nameLen);
|
|
|
+ FsPermission filePerm = FILE_PERM.get();
|
|
|
if (node.isDirectory()) {
|
|
|
out.writeShort(0); // replication
|
|
|
out.writeLong(node.getModificationTime());
|
|
@@ -1719,10 +1779,10 @@ public class FSImage extends Storage {
|
|
|
out.writeInt(-1); // # of blocks
|
|
|
out.writeLong(node.getNsQuota());
|
|
|
out.writeLong(node.getDsQuota());
|
|
|
- FILE_PERM.fromShort(node.getFsPermissionShort());
|
|
|
+ filePerm.fromShort(node.getFsPermissionShort());
|
|
|
PermissionStatus.write(out, node.getUserName(),
|
|
|
node.getGroupName(),
|
|
|
- FILE_PERM);
|
|
|
+ filePerm);
|
|
|
} else if (node.isLink()) {
|
|
|
out.writeShort(0); // replication
|
|
|
out.writeLong(0); // modification time
|
|
@@ -1730,10 +1790,10 @@ public class FSImage extends Storage {
|
|
|
out.writeLong(0); // preferred block size
|
|
|
out.writeInt(-2); // # of blocks
|
|
|
Text.writeString(out, ((INodeSymlink)node).getLinkValue());
|
|
|
- FILE_PERM.fromShort(node.getFsPermissionShort());
|
|
|
+ filePerm.fromShort(node.getFsPermissionShort());
|
|
|
PermissionStatus.write(out, node.getUserName(),
|
|
|
node.getGroupName(),
|
|
|
- FILE_PERM);
|
|
|
+ filePerm);
|
|
|
} else {
|
|
|
INodeFile fileINode = (INodeFile)node;
|
|
|
out.writeShort(fileINode.getReplication());
|
|
@@ -1744,10 +1804,10 @@ public class FSImage extends Storage {
|
|
|
out.writeInt(blocks.length);
|
|
|
for (Block blk : blocks)
|
|
|
blk.write(out);
|
|
|
- FILE_PERM.fromShort(fileINode.getFsPermissionShort());
|
|
|
+ filePerm.fromShort(fileINode.getFsPermissionShort());
|
|
|
PermissionStatus.write(out, fileINode.getUserName(),
|
|
|
fileINode.getGroupName(),
|
|
|
- FILE_PERM);
|
|
|
+ filePerm);
|
|
|
}
|
|
|
}
|
|
|
|