|
@@ -29,12 +29,8 @@ import org.apache.hadoop.net.NetUtils;
|
|
|
|
|
|
import java.io.*;
|
|
|
import java.net.*;
|
|
|
-import java.util.Map;
|
|
|
-import javax.servlet.ServletContext;
|
|
|
-import javax.servlet.ServletException;
|
|
|
-import javax.servlet.http.HttpServlet;
|
|
|
-import javax.servlet.http.HttpServletRequest;
|
|
|
-import javax.servlet.http.HttpServletResponse;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collection;
|
|
|
import org.apache.hadoop.metrics.jvm.JvmMetrics;
|
|
|
|
|
|
/**********************************************************
|
|
@@ -52,11 +48,11 @@ import org.apache.hadoop.metrics.jvm.JvmMetrics;
|
|
|
**********************************************************/
|
|
|
public class SecondaryNameNode implements FSConstants, Runnable {
|
|
|
|
|
|
- public static final Log LOG = LogFactory.getLog(
|
|
|
- "org.apache.hadoop.dfs.NameNode.Secondary");
|
|
|
- private static final String SRC_FS_IMAGE = "srcimage.tmp";
|
|
|
- private static final String FS_EDITS = "edits.tmp";
|
|
|
- private static final String DEST_FS_IMAGE = "destimage.tmp";
|
|
|
+ public static final Log LOG =
|
|
|
+ LogFactory.getLog("org.apache.hadoop.dfs.NameNode.Secondary");
|
|
|
+
|
|
|
+ private String fsName;
|
|
|
+ private CheckpointStorage checkpointImage;
|
|
|
|
|
|
private ClientProtocol namenode;
|
|
|
private Configuration conf;
|
|
@@ -66,12 +62,9 @@ public class SecondaryNameNode implements FSConstants, Runnable {
|
|
|
private int infoPort;
|
|
|
private String infoBindAddress;
|
|
|
|
|
|
- private File checkpointDir;
|
|
|
+ private Collection<File> checkpointDirs;
|
|
|
private long checkpointPeriod; // in seconds
|
|
|
private long checkpointSize; // size (in MB) of current Edit Log
|
|
|
- private File srcImage;
|
|
|
- private File destImage;
|
|
|
- private File editFile;
|
|
|
|
|
|
/**
|
|
|
* Utility class to facilitate junit test error simulation.
|
|
@@ -103,17 +96,30 @@ public class SecondaryNameNode implements FSConstants, Runnable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ FSImage getFSImage() {
|
|
|
+ return checkpointImage;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Create a connection to the primary namenode.
|
|
|
*/
|
|
|
- public SecondaryNameNode(Configuration conf) throws IOException {
|
|
|
+ SecondaryNameNode(Configuration conf) throws IOException {
|
|
|
+ try {
|
|
|
+ initialize(conf);
|
|
|
+ } catch(IOException e) {
|
|
|
+ shutdown();
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
+ /**
|
|
|
+ * Initialize SecondaryNameNode.
|
|
|
+ */
|
|
|
+ private void initialize(Configuration conf) throws IOException {
|
|
|
// initiate Java VM metrics
|
|
|
JvmMetrics.init("SecondaryNameNode", conf.get("session.id"));
|
|
|
|
|
|
- //
|
|
|
// Create connection to the namenode.
|
|
|
- //
|
|
|
shouldRun = true;
|
|
|
nameNodeAddr =
|
|
|
NetUtils.createSocketAddr(FileSystem.getDefaultUri(conf).getAuthority());
|
|
@@ -122,9 +128,18 @@ public class SecondaryNameNode implements FSConstants, Runnable {
|
|
|
(ClientProtocol) RPC.waitForProxy(ClientProtocol.class,
|
|
|
ClientProtocol.versionID, nameNodeAddr, conf);
|
|
|
|
|
|
- //
|
|
|
+ // initialize checkpoint directories
|
|
|
+ fsName = getInfoServer();
|
|
|
+ checkpointDirs = FSImage.getCheckpointDirs(conf,
|
|
|
+ "/tmp/hadoop/dfs/namesecondary");
|
|
|
+ checkpointImage = new CheckpointStorage();
|
|
|
+ checkpointImage.recoverCreate(checkpointDirs);
|
|
|
+
|
|
|
+ // Initialize other scheduling parameters from the configuration
|
|
|
+ checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600);
|
|
|
+ checkpointSize = conf.getLong("fs.checkpoint.size", 4194304);
|
|
|
+
|
|
|
// initialize the webserver for uploading files.
|
|
|
- //
|
|
|
String infoAddr =
|
|
|
NetUtils.getServerAddress(conf,
|
|
|
"dfs.secondary.info.bindAddress",
|
|
@@ -133,9 +148,9 @@ public class SecondaryNameNode implements FSConstants, Runnable {
|
|
|
InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
|
|
|
infoBindAddress = infoSocAddr.getHostName();
|
|
|
int tmpInfoPort = infoSocAddr.getPort();
|
|
|
- infoServer = new StatusHttpServer("dfs", infoBindAddress, tmpInfoPort,
|
|
|
+ infoServer = new StatusHttpServer("secondary", infoBindAddress, tmpInfoPort,
|
|
|
tmpInfoPort == 0);
|
|
|
- infoServer.setAttribute("name.secondary", this);
|
|
|
+ infoServer.setAttribute("name.system.image", checkpointImage);
|
|
|
this.infoServer.setAttribute("name.conf", conf);
|
|
|
infoServer.addServlet("getimage", "/getimage", GetImageServlet.class);
|
|
|
infoServer.start();
|
|
@@ -144,17 +159,6 @@ public class SecondaryNameNode implements FSConstants, Runnable {
|
|
|
infoPort = infoServer.getPort();
|
|
|
conf.set("dfs.secondary.http.address", infoBindAddress + ":" +infoPort);
|
|
|
LOG.info("Secondary Web-server up at: " + infoBindAddress + ":" +infoPort);
|
|
|
-
|
|
|
- //
|
|
|
- // Initialize other scheduling parameters from the configuration
|
|
|
- //
|
|
|
- String[] dirName = conf.getStrings("fs.checkpoint.dir");
|
|
|
- checkpointDir = new File(dirName[0]);
|
|
|
- checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600);
|
|
|
- checkpointSize = conf.getLong("fs.checkpoint.size", 4194304);
|
|
|
- doSetup();
|
|
|
-
|
|
|
- LOG.warn("Checkpoint Directory:" + checkpointDir);
|
|
|
LOG.warn("Checkpoint Period :" + checkpointPeriod + " secs " +
|
|
|
"(" + checkpointPeriod/60 + " min)");
|
|
|
LOG.warn("Log Size Trigger :" + checkpointSize + " bytes " +
|
|
@@ -168,26 +172,15 @@ public class SecondaryNameNode implements FSConstants, Runnable {
|
|
|
public void shutdown() {
|
|
|
shouldRun = false;
|
|
|
try {
|
|
|
- infoServer.stop();
|
|
|
- } catch (Exception e) {
|
|
|
+ if (infoServer != null) infoServer.stop();
|
|
|
+ } catch(InterruptedException ie) {
|
|
|
+ LOG.warn(StringUtils.stringifyException(ie));
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ if (checkpointImage != null) checkpointImage.close();
|
|
|
+ } catch(IOException e) {
|
|
|
+ LOG.warn(StringUtils.stringifyException(e));
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- private void doSetup() throws IOException {
|
|
|
- //
|
|
|
- // Create the checkpoint directory if needed.
|
|
|
- //
|
|
|
- checkpointDir.mkdirs();
|
|
|
- srcImage = new File(checkpointDir, SRC_FS_IMAGE);
|
|
|
- destImage = new File(checkpointDir, DEST_FS_IMAGE);
|
|
|
- editFile = new File(checkpointDir, FS_EDITS);
|
|
|
- srcImage.delete();
|
|
|
- destImage.delete();
|
|
|
- editFile.delete();
|
|
|
- }
|
|
|
-
|
|
|
- File getNewImage() {
|
|
|
- return destImage;
|
|
|
}
|
|
|
|
|
|
//
|
|
@@ -224,11 +217,11 @@ public class SecondaryNameNode implements FSConstants, Runnable {
|
|
|
lastCheckpointTime = now;
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
- LOG.error("Exception in doCheckpoint:");
|
|
|
+ LOG.error("Exception in doCheckpoint: ");
|
|
|
LOG.error(StringUtils.stringifyException(e));
|
|
|
e.printStackTrace();
|
|
|
} catch (Throwable e) {
|
|
|
- LOG.error("Throwable Exception in doCheckpoint:");
|
|
|
+ LOG.error("Throwable Exception in doCheckpoint: ");
|
|
|
LOG.error(StringUtils.stringifyException(e));
|
|
|
e.printStackTrace();
|
|
|
Runtime.getRuntime().exit(-1);
|
|
@@ -237,41 +230,48 @@ public class SecondaryNameNode implements FSConstants, Runnable {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * get the current fsimage from Namenode.
|
|
|
+ * Download <code>fsimage</code> and <code>edits</code>
|
|
|
+ * files from the name-node.
|
|
|
+ * @throws IOException
|
|
|
*/
|
|
|
- private void getFSImage() throws IOException {
|
|
|
- String fsName = getInfoServer();
|
|
|
- String fileid = "getimage=1";
|
|
|
- TransferFsImage.getFileClient(fsName, fileid, srcImage);
|
|
|
- LOG.info("Downloaded file " + srcImage + " size " +
|
|
|
- srcImage.length() + " bytes.");
|
|
|
- }
|
|
|
+ private void downloadCheckpointFiles(CheckpointSignature sig
|
|
|
+ ) throws IOException {
|
|
|
+
|
|
|
+ checkpointImage.cTime = sig.cTime;
|
|
|
+ checkpointImage.checkpointTime = sig.checkpointTime;
|
|
|
|
|
|
- /**
|
|
|
- * get the old edits file from the NameNode
|
|
|
- */
|
|
|
- private void getFSEdits() throws IOException {
|
|
|
- String fsName = getInfoServer();
|
|
|
- String fileid = "getedit=1";
|
|
|
- TransferFsImage.getFileClient(fsName, fileid, editFile);
|
|
|
- LOG.info("Downloaded file " + editFile + " size " +
|
|
|
- editFile.length() + " bytes.");
|
|
|
+ // get fsimage
|
|
|
+ String fileid = "getimage=1";
|
|
|
+ File[] srcNames = checkpointImage.getImageFiles();
|
|
|
+ assert srcNames.length > 0 : "No checkpoint targets.";
|
|
|
+ TransferFsImage.getFileClient(fsName, fileid, srcNames);
|
|
|
+ LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
|
|
|
+ srcNames[0].length() + " bytes.");
|
|
|
+
|
|
|
+ // get edits file
|
|
|
+ fileid = "getedit=1";
|
|
|
+ srcNames = checkpointImage.getEditsFiles();
|
|
|
+ assert srcNames.length > 0 : "No checkpoint targets.";
|
|
|
+ TransferFsImage.getFileClient(fsName, fileid, srcNames);
|
|
|
+ LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
|
|
|
+ srcNames[0].length() + " bytes.");
|
|
|
+
|
|
|
+ checkpointImage.checkpointUploadDone();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Copy the new fsimage into the NameNode
|
|
|
*/
|
|
|
- private void putFSImage(long token) throws IOException {
|
|
|
- String fsName = getInfoServer();
|
|
|
+ private void putFSImage(CheckpointSignature sig) throws IOException {
|
|
|
String fileid = "putimage=1&port=" + infoPort +
|
|
|
"&machine=" +
|
|
|
InetAddress.getLocalHost().getHostAddress() +
|
|
|
- "&token=" + token;
|
|
|
+ "&token=" + sig.toString();
|
|
|
LOG.info("Posted URL " + fsName + fileid);
|
|
|
TransferFsImage.getFileClient(fsName, fileid, (File[])null);
|
|
|
}
|
|
|
|
|
|
- /*
|
|
|
+ /**
|
|
|
* Returns the Jetty server that the Namenode is listening on.
|
|
|
*/
|
|
|
private String getInfoServer() throws IOException {
|
|
@@ -283,66 +283,62 @@ public class SecondaryNameNode implements FSConstants, Runnable {
|
|
|
"dfs.info.port", "dfs.http.address");
|
|
|
}
|
|
|
|
|
|
- /*
|
|
|
+ /**
|
|
|
* Create a new checkpoint
|
|
|
*/
|
|
|
void doCheckpoint() throws IOException {
|
|
|
|
|
|
- //
|
|
|
// Do the required initialization of the merge work area.
|
|
|
- //
|
|
|
- doSetup();
|
|
|
+ startCheckpoint();
|
|
|
|
|
|
- //
|
|
|
// Tell the namenode to start logging transactions in a new edit file
|
|
|
// Retuns a token that would be used to upload the merged image.
|
|
|
- //
|
|
|
- long token = namenode.rollEditLog();
|
|
|
+ CheckpointSignature sig = (CheckpointSignature)namenode.rollEditLog();
|
|
|
|
|
|
- //
|
|
|
// error simulation code for junit test
|
|
|
- //
|
|
|
if (ErrorSimulator.getErrorSimulation(0)) {
|
|
|
throw new IOException("Simulating error0 " +
|
|
|
"after creating edits.new");
|
|
|
}
|
|
|
|
|
|
- getFSImage(); // Fetch fsimage
|
|
|
- getFSEdits(); // Fetch edist
|
|
|
- doMerge(); // Do the merge
|
|
|
+ downloadCheckpointFiles(sig); // Fetch fsimage and edits
|
|
|
+ doMerge(sig); // Do the merge
|
|
|
|
|
|
//
|
|
|
// Upload the new image into the NameNode. Then tell the Namenode
|
|
|
// to make this new uploaded image as the most current image.
|
|
|
//
|
|
|
- putFSImage(token);
|
|
|
+ putFSImage(sig);
|
|
|
|
|
|
- //
|
|
|
// error simulation code for junit test
|
|
|
- //
|
|
|
if (ErrorSimulator.getErrorSimulation(1)) {
|
|
|
throw new IOException("Simulating error1 " +
|
|
|
"after uploading new image to NameNode");
|
|
|
}
|
|
|
|
|
|
namenode.rollFsImage();
|
|
|
+ checkpointImage.endCheckpoint();
|
|
|
|
|
|
- LOG.warn("Checkpoint done. Image Size:" + srcImage.length() +
|
|
|
- " Edit Size:" + editFile.length() +
|
|
|
- " New Image Size:" + destImage.length());
|
|
|
+ LOG.warn("Checkpoint done. New Image Size: "
|
|
|
+ + checkpointImage.getFsImageName().length());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void startCheckpoint() throws IOException {
|
|
|
+ checkpointImage.unlockAll();
|
|
|
+ checkpointImage.getEditLog().close();
|
|
|
+ checkpointImage.recoverCreate(checkpointDirs);
|
|
|
+ checkpointImage.startCheckpoint();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * merges SRC_FS_IMAGE with FS_EDITS and writes the output into
|
|
|
- * DEST_FS_IMAGE
|
|
|
+ * Merge downloaded image and edits and write the new image into
|
|
|
+ * current storage directory.
|
|
|
*/
|
|
|
- private void doMerge() throws IOException {
|
|
|
+ private void doMerge(CheckpointSignature sig) throws IOException {
|
|
|
FSNamesystem namesystem =
|
|
|
- new FSNamesystem(new FSImage(checkpointDir), conf);
|
|
|
- FSImage fsImage = namesystem.dir.fsImage;
|
|
|
- fsImage.loadFSImage(srcImage);
|
|
|
- fsImage.getEditLog().loadFSEdits(editFile);
|
|
|
- fsImage.saveFSImage(destImage);
|
|
|
+ new FSNamesystem(checkpointImage, conf);
|
|
|
+ assert namesystem.dir.fsImage == checkpointImage;
|
|
|
+ checkpointImage.doMerge(sig);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -446,37 +442,6 @@ public class SecondaryNameNode implements FSConstants, Runnable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * This class is used in Namesystem's jetty to retrieve a file.
|
|
|
- * Typically used by the Secondary NameNode to retrieve image and
|
|
|
- * edit file for periodic checkpointing.
|
|
|
- */
|
|
|
- public static class GetImageServlet extends HttpServlet {
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- public void doGet(HttpServletRequest request,
|
|
|
- HttpServletResponse response
|
|
|
- ) throws ServletException, IOException {
|
|
|
- Map<String,String[]> pmap = request.getParameterMap();
|
|
|
- try {
|
|
|
- ServletContext context = getServletContext();
|
|
|
- SecondaryNameNode nn = (SecondaryNameNode)
|
|
|
- context.getAttribute("name.secondary");
|
|
|
- TransferFsImage ff = new TransferFsImage(pmap, request, response);
|
|
|
- if (ff.getImage()) {
|
|
|
- TransferFsImage.getFileServer(response.getOutputStream(),
|
|
|
- nn.getNewImage());
|
|
|
- }
|
|
|
- LOG.info("New Image " + nn.getNewImage() + " retrieved by Namenode.");
|
|
|
- } catch (Exception ie) {
|
|
|
- String errMsg = "GetImage failed. " + StringUtils.stringifyException(ie);
|
|
|
- response.sendError(HttpServletResponse.SC_GONE, errMsg);
|
|
|
- throw new IOException(errMsg);
|
|
|
- } finally {
|
|
|
- response.getOutputStream().close();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* main() has some simple utility methods.
|
|
|
* @param argv Command line parameters.
|
|
@@ -495,4 +460,116 @@ public class SecondaryNameNode implements FSConstants, Runnable {
|
|
|
Daemon checkpointThread = new Daemon(new SecondaryNameNode(tconf));
|
|
|
checkpointThread.start();
|
|
|
}
|
|
|
+
|
|
|
+ static class CheckpointStorage extends FSImage {
|
|
|
+ /**
|
|
|
+ */
|
|
|
+ CheckpointStorage() throws IOException {
|
|
|
+ super();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ boolean isConversionNeeded(StorageDirectory sd) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Analyze checkpoint directories.
|
|
|
+ * Create directories if they do not exist.
|
|
|
+ * Recover from an unsuccessful checkpoint is necessary.
|
|
|
+ *
|
|
|
+ * @param dataDirs
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ void recoverCreate(Collection<File> dataDirs) throws IOException {
|
|
|
+ this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
|
|
|
+ for(File dataDir : dataDirs) {
|
|
|
+ boolean isAccessible = true;
|
|
|
+ try { // create directories if don't exist yet
|
|
|
+ if(!dataDir.mkdirs()) {
|
|
|
+ // do nothing, directory is already ctreated
|
|
|
+ }
|
|
|
+ } catch(SecurityException se) {
|
|
|
+ isAccessible = false;
|
|
|
+ }
|
|
|
+ if(!isAccessible)
|
|
|
+ throw new InconsistentFSStateException(dataDir,
|
|
|
+ "cannot access checkpoint directory.");
|
|
|
+ StorageDirectory sd = new StorageDirectory(dataDir);
|
|
|
+ StorageState curState;
|
|
|
+ try {
|
|
|
+ curState = sd.analyzeStorage(StartupOption.REGULAR);
|
|
|
+ // sd is locked but not opened
|
|
|
+ switch(curState) {
|
|
|
+ case NON_EXISTENT:
|
|
|
+ // fail if any of the configured checkpoint dirs are inaccessible
|
|
|
+ throw new InconsistentFSStateException(sd.root,
|
|
|
+ "checkpoint directory does not exist or is not accessible.");
|
|
|
+ case NOT_FORMATTED:
|
|
|
+ break; // it's ok since initially there is no current and VERSION
|
|
|
+ case CONVERT:
|
|
|
+ throw new InconsistentFSStateException(sd.root,
|
|
|
+ "not a checkpoint directory.");
|
|
|
+ case NORMAL:
|
|
|
+ break;
|
|
|
+ default: // recovery is possible
|
|
|
+ sd.doRecover(curState);
|
|
|
+ }
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ sd.unlock();
|
|
|
+ throw ioe;
|
|
|
+ }
|
|
|
+ // add to the storage list
|
|
|
+ addStorageDir(sd);
|
|
|
+ LOG.warn("Checkpoint directory " + sd.root + " is added.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Prepare directories for a new checkpoint.
|
|
|
+ * <p>
|
|
|
+ * Rename <code>current</code> to <code>lastcheckpoint.tmp</code>
|
|
|
+ * and recreate <code>current</code>.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ void startCheckpoint() throws IOException {
|
|
|
+ for(StorageDirectory sd : storageDirs) {
|
|
|
+ File curDir = sd.getCurrentDir();
|
|
|
+ File tmpCkptDir = sd.getLastCheckpointTmp();
|
|
|
+ assert !tmpCkptDir.exists() :
|
|
|
+ tmpCkptDir.getName() + " directory must not exist.";
|
|
|
+ if(curDir.exists()) {
|
|
|
+ // rename current to tmp
|
|
|
+ rename(curDir, tmpCkptDir);
|
|
|
+ }
|
|
|
+ if (!curDir.mkdir())
|
|
|
+ throw new IOException("Cannot create directory " + curDir);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void endCheckpoint() throws IOException {
|
|
|
+ for(StorageDirectory sd : storageDirs) {
|
|
|
+ File tmpCkptDir = sd.getLastCheckpointTmp();
|
|
|
+ File prevCkptDir = sd.getPreviousCheckpoint();
|
|
|
+ // delete previous dir
|
|
|
+ if (prevCkptDir.exists())
|
|
|
+ deleteDir(prevCkptDir);
|
|
|
+ // rename tmp to previous
|
|
|
+ if (tmpCkptDir.exists())
|
|
|
+ rename(tmpCkptDir, prevCkptDir);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Merge image and edits, and verify consistency with the signature.
|
|
|
+ */
|
|
|
+ private void doMerge(CheckpointSignature sig) throws IOException {
|
|
|
+ getEditLog().open();
|
|
|
+ StorageDirectory sd = getStorageDir(0);
|
|
|
+ loadFSImage(FSImage.getImageFile(sd, NameNodeFile.IMAGE));
|
|
|
+ loadFSEdits(sd);
|
|
|
+ sig.validateStorageInfo(this);
|
|
|
+ saveFSImage();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|