|
@@ -22,6 +22,7 @@ import java.io.IOException;
|
|
import java.net.InetAddress;
|
|
import java.net.InetAddress;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.URI;
|
|
import java.net.URI;
|
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
@@ -40,6 +41,8 @@ import org.apache.hadoop.ipc.RPC;
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
import org.apache.hadoop.metrics.jvm.JvmMetrics;
|
|
import org.apache.hadoop.metrics.jvm.JvmMetrics;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
|
+import org.apache.hadoop.security.Krb5AndCertsSslSocketConnector;
|
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.util.Daemon;
|
|
import org.apache.hadoop.util.Daemon;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
@@ -70,6 +73,7 @@ public class SecondaryNameNode implements Runnable {
|
|
private volatile boolean shouldRun;
|
|
private volatile boolean shouldRun;
|
|
private HttpServer infoServer;
|
|
private HttpServer infoServer;
|
|
private int infoPort;
|
|
private int infoPort;
|
|
|
|
+ private int imagePort;
|
|
private String infoBindAddress;
|
|
private String infoBindAddress;
|
|
|
|
|
|
private Collection<File> checkpointDirs;
|
|
private Collection<File> checkpointDirs;
|
|
@@ -129,7 +133,7 @@ public class SecondaryNameNode implements Runnable {
|
|
/**
|
|
/**
|
|
* Initialize SecondaryNameNode.
|
|
* Initialize SecondaryNameNode.
|
|
*/
|
|
*/
|
|
- private void initialize(Configuration conf) throws IOException {
|
|
|
|
|
|
+ private void initialize(final Configuration conf) throws IOException {
|
|
// initiate Java VM metrics
|
|
// initiate Java VM metrics
|
|
JvmMetrics.init("SecondaryNameNode", conf.get("session.id"));
|
|
JvmMetrics.init("SecondaryNameNode", conf.get("session.id"));
|
|
|
|
|
|
@@ -156,25 +160,67 @@ public class SecondaryNameNode implements Runnable {
|
|
checkpointSize = conf.getLong("fs.checkpoint.size", 4194304);
|
|
checkpointSize = conf.getLong("fs.checkpoint.size", 4194304);
|
|
|
|
|
|
// initialize the webserver for uploading files.
|
|
// initialize the webserver for uploading files.
|
|
- String infoAddr =
|
|
|
|
- NetUtils.getServerAddress(conf,
|
|
|
|
- "dfs.secondary.info.bindAddress",
|
|
|
|
- "dfs.secondary.info.port",
|
|
|
|
- "dfs.secondary.http.address");
|
|
|
|
- InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
|
|
|
|
- infoBindAddress = infoSocAddr.getHostName();
|
|
|
|
- int tmpInfoPort = infoSocAddr.getPort();
|
|
|
|
- infoServer = new HttpServer("secondary", infoBindAddress, tmpInfoPort,
|
|
|
|
- tmpInfoPort == 0, conf);
|
|
|
|
- infoServer.setAttribute("name.system.image", checkpointImage);
|
|
|
|
- this.infoServer.setAttribute("name.conf", conf);
|
|
|
|
- infoServer.addInternalServlet("getimage", "/getimage", GetImageServlet.class);
|
|
|
|
- infoServer.start();
|
|
|
|
-
|
|
|
|
|
|
+ // Kerberized SSL servers must be run from the host principal...
|
|
|
|
+ DFSUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
|
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY);
|
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
|
|
|
|
+ try {
|
|
|
|
+ infoServer = ugi.doAs(new PrivilegedExceptionAction<HttpServer>() {
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public HttpServer run() throws IOException, InterruptedException {
|
|
|
|
+ LOG.info("Starting web server as: " +
|
|
|
|
+ UserGroupInformation.getLoginUser().getUserName());
|
|
|
|
+
|
|
|
|
+ String infoAddr =
|
|
|
|
+ NetUtils.getServerAddress(conf,
|
|
|
|
+ "dfs.secondary.info.bindAddress",
|
|
|
|
+ "dfs.secondary.info.port",
|
|
|
|
+ "dfs.secondary.http.address");
|
|
|
|
+
|
|
|
|
+ InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
|
|
|
|
+ infoBindAddress = infoSocAddr.getHostName();
|
|
|
|
+ int tmpInfoPort = infoSocAddr.getPort();
|
|
|
|
+ infoServer = new HttpServer("secondary", infoBindAddress, tmpInfoPort,
|
|
|
|
+ tmpInfoPort == 0, conf);
|
|
|
|
+
|
|
|
|
+ if(UserGroupInformation.isSecurityEnabled()) {
|
|
|
|
+ System.setProperty("https.cipherSuites",
|
|
|
|
+ Krb5AndCertsSslSocketConnector.KRB5_CIPHER_SUITES[0]);
|
|
|
|
+ InetSocketAddress secInfoSocAddr =
|
|
|
|
+ NetUtils.createSocketAddr(infoBindAddress + ":"+ conf.get(
|
|
|
|
+ "dfs.secondary.https.port", infoBindAddress + ":" + 0));
|
|
|
|
+ imagePort = secInfoSocAddr.getPort();
|
|
|
|
+ infoServer.addSslListener(secInfoSocAddr, conf, false, true);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ infoServer.setAttribute("name.system.image", checkpointImage);
|
|
|
|
+ infoServer.setAttribute("name.conf", conf);
|
|
|
|
+ infoServer.addInternalServlet("getimage", "/getimage",
|
|
|
|
+ GetImageServlet.class, true);
|
|
|
|
+ infoServer.start();
|
|
|
|
+ return infoServer;
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ throw new RuntimeException(e);
|
|
|
|
+ } finally {
|
|
|
|
+ // Go back to being the correct Namenode principal
|
|
|
|
+ LOG.info("Web server init done, returning to: " +
|
|
|
|
+ UserGroupInformation.getLoginUser().getUserName());
|
|
|
|
+ DFSUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
|
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY);
|
|
|
|
+
|
|
|
|
+ }
|
|
// The web-server port can be ephemeral... ensure we have the correct info
|
|
// The web-server port can be ephemeral... ensure we have the correct info
|
|
|
|
+
|
|
infoPort = infoServer.getPort();
|
|
infoPort = infoServer.getPort();
|
|
|
|
+ if(!UserGroupInformation.isSecurityEnabled())
|
|
|
|
+ imagePort = infoPort;
|
|
|
|
+
|
|
conf.set("dfs.secondary.http.address", infoBindAddress + ":" +infoPort);
|
|
conf.set("dfs.secondary.http.address", infoBindAddress + ":" +infoPort);
|
|
LOG.info("Secondary Web-server up at: " + infoBindAddress + ":" +infoPort);
|
|
LOG.info("Secondary Web-server up at: " + infoBindAddress + ":" +infoPort);
|
|
|
|
+ LOG.info("Secondary image servlet up at: " + infoBindAddress + ":" + imagePort);
|
|
LOG.warn("Checkpoint Period :" + checkpointPeriod + " secs " +
|
|
LOG.warn("Checkpoint Period :" + checkpointPeriod + " secs " +
|
|
"(" + checkpointPeriod/60 + " min)");
|
|
"(" + checkpointPeriod/60 + " min)");
|
|
LOG.warn("Log Size Trigger :" + checkpointSize + " bytes " +
|
|
LOG.warn("Log Size Trigger :" + checkpointSize + " bytes " +
|
|
@@ -250,36 +296,47 @@ public class SecondaryNameNode implements Runnable {
|
|
* files from the name-node.
|
|
* files from the name-node.
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- private void downloadCheckpointFiles(CheckpointSignature sig
|
|
|
|
|
|
+ private void downloadCheckpointFiles(final CheckpointSignature sig
|
|
) throws IOException {
|
|
) throws IOException {
|
|
-
|
|
|
|
- checkpointImage.cTime = sig.cTime;
|
|
|
|
- checkpointImage.checkpointTime = sig.checkpointTime;
|
|
|
|
-
|
|
|
|
- // get fsimage
|
|
|
|
- String fileid = "getimage=1";
|
|
|
|
- File[] srcNames = checkpointImage.getImageFiles();
|
|
|
|
- assert srcNames.length > 0 : "No checkpoint targets.";
|
|
|
|
- TransferFsImage.getFileClient(fsName, fileid, srcNames);
|
|
|
|
- LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
|
|
|
|
- srcNames[0].length() + " bytes.");
|
|
|
|
-
|
|
|
|
- // get edits file
|
|
|
|
- fileid = "getedit=1";
|
|
|
|
- srcNames = checkpointImage.getEditsFiles();
|
|
|
|
- assert srcNames.length > 0 : "No checkpoint targets.";
|
|
|
|
- TransferFsImage.getFileClient(fsName, fileid, srcNames);
|
|
|
|
- LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
|
|
|
|
- srcNames[0].length() + " bytes.");
|
|
|
|
-
|
|
|
|
- checkpointImage.checkpointUploadDone();
|
|
|
|
|
|
+ try {
|
|
|
|
+ UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Void>() {
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public Void run() throws Exception {
|
|
|
|
+ checkpointImage.cTime = sig.cTime;
|
|
|
|
+ checkpointImage.checkpointTime = sig.checkpointTime;
|
|
|
|
+
|
|
|
|
+ // get fsimage
|
|
|
|
+ String fileid = "getimage=1";
|
|
|
|
+ File[] srcNames = checkpointImage.getImageFiles();
|
|
|
|
+ assert srcNames.length > 0 : "No checkpoint targets.";
|
|
|
|
+ TransferFsImage.getFileClient(fsName, fileid, srcNames);
|
|
|
|
+ LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
|
|
|
|
+ srcNames[0].length() + " bytes.");
|
|
|
|
+
|
|
|
|
+ // get edits file
|
|
|
|
+ fileid = "getedit=1";
|
|
|
|
+ srcNames = checkpointImage.getEditsFiles();
|
|
|
|
+ assert srcNames.length > 0 : "No checkpoint targets.";
|
|
|
|
+ TransferFsImage.getFileClient(fsName, fileid, srcNames);
|
|
|
|
+ LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
|
|
|
|
+ srcNames[0].length() + " bytes.");
|
|
|
|
+
|
|
|
|
+ checkpointImage.checkpointUploadDone();
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ throw new RuntimeException(e);
|
|
|
|
+ }
|
|
|
|
+
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* Copy the new fsimage into the NameNode
|
|
* Copy the new fsimage into the NameNode
|
|
*/
|
|
*/
|
|
private void putFSImage(CheckpointSignature sig) throws IOException {
|
|
private void putFSImage(CheckpointSignature sig) throws IOException {
|
|
- String fileid = "putimage=1&port=" + infoPort +
|
|
|
|
|
|
+ String fileid = "putimage=1&port=" + imagePort +
|
|
"&machine=" +
|
|
"&machine=" +
|
|
InetAddress.getLocalHost().getHostAddress() +
|
|
InetAddress.getLocalHost().getHostAddress() +
|
|
"&token=" + sig.toString();
|
|
"&token=" + sig.toString();
|
|
@@ -295,8 +352,13 @@ public class SecondaryNameNode implements Runnable {
|
|
if (!"hdfs".equals(fsName.getScheme())) {
|
|
if (!"hdfs".equals(fsName.getScheme())) {
|
|
throw new IOException("This is not a DFS");
|
|
throw new IOException("This is not a DFS");
|
|
}
|
|
}
|
|
- return NetUtils.getServerAddress(conf, "dfs.info.bindAddress",
|
|
|
|
- "dfs.info.port", "dfs.http.address");
|
|
|
|
|
|
+ String http = UserGroupInformation.isSecurityEnabled() ? "dfs.https.address"
|
|
|
|
+ : "dfs.http.address";
|
|
|
|
+ String infoAddr = NetUtils.getServerAddress(conf, "dfs.info.bindAddress",
|
|
|
|
+ "dfs.info.port", http);
|
|
|
|
+
|
|
|
|
+ LOG.debug("infoAddr = " + infoAddr);
|
|
|
|
+ return infoAddr;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|