|
@@ -18,10 +18,12 @@
|
|
|
package org.apache.hadoop.hdfs.server.namenode;
|
|
|
|
|
|
import java.io.File;
|
|
|
+import java.io.FileInputStream;
|
|
|
import java.io.IOException;
|
|
|
-import java.net.InetAddress;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.URI;
|
|
|
+import java.security.DigestInputStream;
|
|
|
+import java.security.MessageDigest;
|
|
|
import java.security.PrivilegedAction;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.ArrayList;
|
|
@@ -33,11 +35,12 @@ import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
-import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
|
|
+import static org.apache.hadoop.hdfs.protocol.FSConstants.BUFFER_SIZE;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
|
|
import org.apache.hadoop.http.HttpServer;
|
|
|
+import org.apache.hadoop.io.MD5Hash;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
import org.apache.hadoop.metrics2.source.JvmMetricsSource;
|
|
@@ -344,7 +347,7 @@ public class SecondaryNameNode implements Runnable {
|
|
|
String fileid = "getimage=1";
|
|
|
File[] srcNames = checkpointImage.getImageFiles();
|
|
|
assert srcNames.length > 0 : "No checkpoint targets.";
|
|
|
- TransferFsImage.getFileClient(fsName, fileid, srcNames);
|
|
|
+ TransferFsImage.getFileClient(fsName, fileid, srcNames, false);
|
|
|
LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
|
|
|
srcNames[0].length() + " bytes.");
|
|
|
|
|
@@ -352,7 +355,7 @@ public class SecondaryNameNode implements Runnable {
|
|
|
fileid = "getedit=1";
|
|
|
srcNames = checkpointImage.getEditsFiles();
|
|
|
assert srcNames.length > 0 : "No checkpoint targets.";
|
|
|
- TransferFsImage.getFileClient(fsName, fileid, srcNames);
|
|
|
+ TransferFsImage.getFileClient(fsName, fileid, srcNames, false);
|
|
|
LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
|
|
|
srcNames[0].length() + " bytes.");
|
|
|
|
|
@@ -372,9 +375,35 @@ public class SecondaryNameNode implements Runnable {
|
|
|
private void putFSImage(CheckpointSignature sig) throws IOException {
|
|
|
String fileid = "putimage=1&port=" + imagePort +
|
|
|
"&machine=" + infoBindAddress +
|
|
|
- "&token=" + sig.toString();
|
|
|
+ "&token=" + sig.toString() +
|
|
|
+ "&newChecksum=" + getNewChecksum();
|
|
|
LOG.info("Posted URL " + fsName + fileid);
|
|
|
- TransferFsImage.getFileClient(fsName, fileid, (File[])null);
|
|
|
+ TransferFsImage.getFileClient(fsName, fileid, (File[])null, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Calculate the MD5 hash of the newly-merged fsimage.
|
|
|
+ * @return the checksum of the newly-merged fsimage.
|
|
|
+ */
|
|
|
+ MD5Hash getNewChecksum() throws IOException {
|
|
|
+ DigestInputStream imageIn = null;
|
|
|
+ try {
|
|
|
+ MessageDigest digester = MD5Hash.getDigester();
|
|
|
+ imageIn = new DigestInputStream(
|
|
|
+ new FileInputStream(checkpointImage.getFsImageName()), digester);
|
|
|
+ byte[] in = new byte[BUFFER_SIZE];
|
|
|
+ int totalRead = 0;
|
|
|
+ int read = 0;
|
|
|
+ while ((read = imageIn.read(in)) > 0) {
|
|
|
+ totalRead += read;
|
|
|
+ LOG.debug("Computing fsimage checksum. Read " + totalRead + " bytes so far.");
|
|
|
+ }
|
|
|
+ return new MD5Hash(digester.digest());
|
|
|
+ } finally {
|
|
|
+ if (imageIn != null) {
|
|
|
+ imageIn.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -399,7 +428,7 @@ public class SecondaryNameNode implements Runnable {
|
|
|
startCheckpoint();
|
|
|
|
|
|
// Tell the namenode to start logging transactions in a new edit file
|
|
|
- // Retuns a token that would be used to upload the merged image.
|
|
|
+ // Retuns a token that should be used to verify the downloaded image file.
|
|
|
CheckpointSignature sig = (CheckpointSignature)namenode.rollEditLog();
|
|
|
|
|
|
// error simulation code for junit test
|
|
@@ -408,13 +437,11 @@ public class SecondaryNameNode implements Runnable {
|
|
|
"after creating edits.new");
|
|
|
}
|
|
|
|
|
|
- downloadCheckpointFiles(sig); // Fetch fsimage and edits
|
|
|
- doMerge(sig); // Do the merge
|
|
|
-
|
|
|
- //
|
|
|
- // Upload the new image into the NameNode. Then tell the Namenode
|
|
|
- // to make this new uploaded image as the most current image.
|
|
|
- //
|
|
|
+ downloadCheckpointFiles(sig); // Fetch fsimage and edits
|
|
|
+ doMerge(sig); // Do the merge
|
|
|
+
|
|
|
+ // Upload the new image into the NameNode, providing the new checksum for
|
|
|
+ // the image file.
|
|
|
putFSImage(sig);
|
|
|
|
|
|
// error simulation code for junit test
|
|
@@ -423,6 +450,8 @@ public class SecondaryNameNode implements Runnable {
|
|
|
"after uploading new image to NameNode");
|
|
|
}
|
|
|
|
|
|
+ // Then tell the Namenode to make this new uploaded image as the most
|
|
|
+ // current image.
|
|
|
namenode.rollFsImage();
|
|
|
checkpointImage.endCheckpoint();
|
|
|
|