|
@@ -71,6 +71,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
|
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
|
|
@@ -220,6 +221,8 @@ public class DataNode extends Configured
|
|
|
// For InterDataNodeProtocol
|
|
|
public Server ipcServer;
|
|
|
|
|
|
+ private SecureResources secureResources = null;
|
|
|
+
|
|
|
/**
|
|
|
* Current system time.
|
|
|
* @return current time in msec.
|
|
@@ -234,6 +237,16 @@ public class DataNode extends Configured
|
|
|
*/
|
|
|
DataNode(final Configuration conf,
|
|
|
final AbstractList<File> dataDirs) throws IOException {
|
|
|
+ this(conf, dataDirs, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Start a Datanode with specified server sockets for secure environments
|
|
|
+ * where they are run with privileged ports and injected from a higher
|
|
|
+ * level of capability
|
|
|
+ */
|
|
|
+ DataNode(final Configuration conf,
|
|
|
+ final AbstractList<File> dataDirs, SecureResources resources) throws IOException {
|
|
|
super(conf);
|
|
|
UserGroupInformation.setConfiguration(conf);
|
|
|
SecurityUtil.login(conf, DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY,
|
|
@@ -242,11 +255,11 @@ public class DataNode extends Configured
|
|
|
datanodeObject = this;
|
|
|
|
|
|
try {
|
|
|
- startDataNode(conf, dataDirs);
|
|
|
+ startDataNode(conf, dataDirs, resources);
|
|
|
} catch (IOException ie) {
|
|
|
shutdown();
|
|
|
throw ie;
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
|
|
@@ -261,8 +274,13 @@ public class DataNode extends Configured
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
void startDataNode(Configuration conf,
|
|
|
- AbstractList<File> dataDirs
|
|
|
+ AbstractList<File> dataDirs, SecureResources resources
|
|
|
) throws IOException {
|
|
|
+ if(UserGroupInformation.isSecurityEnabled() && resources == null)
|
|
|
+ throw new RuntimeException("Cannot start secure cluster without " +
|
|
|
+ "privileged resources.");
|
|
|
+
|
|
|
+ this.secureResources = resources;
|
|
|
// use configured nameserver & interface to get local hostname
|
|
|
if (conf.get("slave.host.name") != null) {
|
|
|
machineName = conf.get("slave.host.name");
|
|
@@ -283,12 +301,8 @@ public class DataNode extends Configured
|
|
|
this.transferToAllowed = conf.getBoolean("dfs.datanode.transferTo.allowed",
|
|
|
true);
|
|
|
this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
|
|
|
- String address =
|
|
|
- NetUtils.getServerAddress(conf,
|
|
|
- "dfs.datanode.bindAddress",
|
|
|
- "dfs.datanode.port",
|
|
|
- "dfs.datanode.address");
|
|
|
- InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
|
|
|
+
|
|
|
+ InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);
|
|
|
int tmpPort = socAddr.getPort();
|
|
|
storage = new DataStorage();
|
|
|
// construct registration
|
|
@@ -332,10 +346,15 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
|
|
|
|
|
|
- // find free port
|
|
|
- ServerSocket ss = (socketWriteTimeout > 0) ?
|
|
|
- ServerSocketChannel.open().socket() : new ServerSocket();
|
|
|
- Server.bind(ss, socAddr, 0);
|
|
|
+ // find free port or use privileged port provide
|
|
|
+ ServerSocket ss;
|
|
|
+ if(secureResources == null) {
|
|
|
+ ss = (socketWriteTimeout > 0) ?
|
|
|
+ ServerSocketChannel.open().socket() : new ServerSocket();
|
|
|
+ Server.bind(ss, socAddr, 0);
|
|
|
+ } else {
|
|
|
+ ss = resources.getStreamingSocket();
|
|
|
+ }
|
|
|
ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
|
|
|
// adjust machine name with the actual port
|
|
|
tmpPort = ss.getLocalPort();
|
|
@@ -376,16 +395,13 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
|
|
|
//create a servlet to serve full-file content
|
|
|
- String infoAddr =
|
|
|
- NetUtils.getServerAddress(conf,
|
|
|
- "dfs.datanode.info.bindAddress",
|
|
|
- "dfs.datanode.info.port",
|
|
|
- "dfs.datanode.http.address");
|
|
|
- InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
|
|
|
+ InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
|
|
|
String infoHost = infoSocAddr.getHostName();
|
|
|
int tmpInfoPort = infoSocAddr.getPort();
|
|
|
- this.infoServer = new HttpServer("datanode", infoHost, tmpInfoPort,
|
|
|
- tmpInfoPort == 0, conf);
|
|
|
+ this.infoServer = (secureResources == null)
|
|
|
+ ? new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0, conf)
|
|
|
+ : new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0, conf,
|
|
|
+ secureResources.getListener());
|
|
|
if (conf.getBoolean("dfs.https.enable", false)) {
|
|
|
boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
|
|
|
InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
|
|
@@ -428,6 +444,18 @@ public class DataNode extends Configured
|
|
|
LOG.info("dnRegistration = " + dnRegistration);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Determine the http server's effective addr
|
|
|
+ */
|
|
|
+ public static InetSocketAddress getInfoAddr(Configuration conf) {
|
|
|
+ String infoAddr = NetUtils.getServerAddress(conf,
|
|
|
+ "dfs.datanode.info.bindAddress",
|
|
|
+ "dfs.datanode.info.port",
|
|
|
+ "dfs.datanode.http.address");
|
|
|
+
|
|
|
+ return NetUtils.createSocketAddr(infoAddr);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Creates either NIO or regular depending on socketWriteTimeout.
|
|
|
*/
|
|
@@ -1322,6 +1350,16 @@ public class DataNode extends Configured
|
|
|
*/
|
|
|
public static DataNode instantiateDataNode(String args[],
|
|
|
Configuration conf) throws IOException {
|
|
|
+ return instantiateDataNode(args, conf, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Instantiate a single datanode object. This must be run by invoking
|
|
|
+ * {@link DataNode#runDatanodeDaemon(DataNode)} subsequently.
|
|
|
+ * @param resources Secure resources needed to run under Kerberos
|
|
|
+ */
|
|
|
+ public static DataNode instantiateDataNode(String args[],
|
|
|
+ Configuration conf,
|
|
|
+ SecureResources resources) throws IOException {
|
|
|
if (conf == null)
|
|
|
conf = new Configuration();
|
|
|
if (!parseArguments(args, conf)) {
|
|
@@ -1336,7 +1374,7 @@ public class DataNode extends Configured
|
|
|
String[] dataDirs = conf.getStrings(DATA_DIR_KEY);
|
|
|
dnThreadName = "DataNode: [" +
|
|
|
StringUtils.arrayToString(dataDirs) + "]";
|
|
|
- return makeInstance(dataDirs, conf);
|
|
|
+ return makeInstance(dataDirs, conf, resources);
|
|
|
}
|
|
|
|
|
|
/** Instantiate & Start a single datanode daemon and wait for it to finish.
|
|
@@ -1344,7 +1382,17 @@ public class DataNode extends Configured
|
|
|
*/
|
|
|
public static DataNode createDataNode(String args[],
|
|
|
Configuration conf) throws IOException {
|
|
|
- DataNode dn = instantiateDataNode(args, conf);
|
|
|
+ return createDataNode(args, conf, null);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /** Instantiate & Start a single datanode daemon and wait for it to finish.
|
|
|
+ * If this thread is specifically interrupted, it will stop waiting.
|
|
|
+ * LimitedPrivate for creating secure datanodes
|
|
|
+ */
|
|
|
+ public static DataNode createDataNode(String args[],
|
|
|
+ Configuration conf, SecureResources resources) throws IOException {
|
|
|
+ DataNode dn = instantiateDataNode(args, conf, resources);
|
|
|
runDatanodeDaemon(dn);
|
|
|
return dn;
|
|
|
}
|
|
@@ -1364,12 +1412,13 @@ public class DataNode extends Configured
|
|
|
* @param dataDirs List of directories, where the new DataNode instance should
|
|
|
* keep its files.
|
|
|
* @param conf Configuration instance to use.
|
|
|
+ * @param resources Secure resources needed to run under Kerberos
|
|
|
* @return DataNode instance for given list of data dirs and conf, or null if
|
|
|
* no directory from this directory list can be created.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public static DataNode makeInstance(String[] dataDirs, Configuration conf)
|
|
|
- throws IOException {
|
|
|
+ public static DataNode makeInstance(String[] dataDirs, Configuration conf,
|
|
|
+ SecureResources resources) throws IOException {
|
|
|
LocalFileSystem localFS = FileSystem.getLocal(conf);
|
|
|
ArrayList<File> dirs = new ArrayList<File>();
|
|
|
FsPermission dataDirPermission =
|
|
@@ -1385,7 +1434,7 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
}
|
|
|
if (dirs.size() > 0)
|
|
|
- return new DataNode(conf, dirs);
|
|
|
+ return new DataNode(conf, dirs, resources);
|
|
|
LOG.error("All directories in " + DATA_DIR_KEY + " are invalid.");
|
|
|
return null;
|
|
|
}
|
|
@@ -1465,18 +1514,20 @@ public class DataNode extends Configured
|
|
|
return data;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- */
|
|
|
- public static void main(String args[]) {
|
|
|
+ public static void secureMain(String [] args, SecureResources resources) {
|
|
|
try {
|
|
|
StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
|
|
|
- DataNode datanode = createDataNode(args, null);
|
|
|
+ DataNode datanode = createDataNode(args, null, resources);
|
|
|
if (datanode != null)
|
|
|
datanode.join();
|
|
|
} catch (Throwable e) {
|
|
|
LOG.error(StringUtils.stringifyException(e));
|
|
|
System.exit(-1);
|
|
|
- }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void main(String args[]) {
|
|
|
+ secureMain(args, null);
|
|
|
}
|
|
|
|
|
|
// InterDataNodeProtocol implementation
|
|
@@ -1735,4 +1786,13 @@ public class DataNode extends Configured
|
|
|
LOG.info(who + " calls recoverBlock(block=" + block
|
|
|
+ ", targets=[" + msg + "])");
|
|
|
}
|
|
|
+
|
|
|
+ public static InetSocketAddress getStreamingAddr(Configuration conf) {
|
|
|
+ String address =
|
|
|
+ NetUtils.getServerAddress(conf,
|
|
|
+ "dfs.datanode.bindAddress",
|
|
|
+ "dfs.datanode.port",
|
|
|
+ "dfs.datanode.address");
|
|
|
+ return NetUtils.createSocketAddr(address);
|
|
|
+ }
|
|
|
}
|