|
@@ -18,6 +18,8 @@
|
|
package org.apache.hadoop.hdfs.server.datanode;
|
|
package org.apache.hadoop.hdfs.server.datanode;
|
|
|
|
|
|
|
|
|
|
|
|
+import static org.apache.hadoop.hdfs.server.common.Util.now;
|
|
|
|
+
|
|
import java.io.BufferedOutputStream;
|
|
import java.io.BufferedOutputStream;
|
|
import java.io.DataOutputStream;
|
|
import java.io.DataOutputStream;
|
|
import java.io.File;
|
|
import java.io.File;
|
|
@@ -54,7 +56,9 @@ import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.LocalFileSystem;
|
|
import org.apache.hadoop.fs.LocalFileSystem;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
|
|
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
|
|
|
|
+import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
|
@@ -74,9 +78,9 @@ import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
|
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
|
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
|
import org.apache.hadoop.hdfs.server.common.Util;
|
|
import org.apache.hadoop.hdfs.server.common.Util;
|
|
-import static org.apache.hadoop.hdfs.server.common.Util.now;
|
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
|
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
|
|
import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
|
|
@@ -94,11 +98,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
|
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
|
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
|
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
|
-import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
|
-import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
|
-import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
|
-import org.apache.hadoop.security.token.Token;
|
|
|
|
-import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
|
import org.apache.hadoop.http.HttpServer;
|
|
import org.apache.hadoop.http.HttpServer;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.ipc.RPC;
|
|
import org.apache.hadoop.ipc.RPC;
|
|
@@ -109,6 +108,8 @@ import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.security.SecurityUtil;
|
|
import org.apache.hadoop.security.SecurityUtil;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
|
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
|
|
+import org.apache.hadoop.security.token.TokenIdentifier;
|
|
import org.apache.hadoop.util.Daemon;
|
|
import org.apache.hadoop.util.Daemon;
|
|
import org.apache.hadoop.util.DiskChecker;
|
|
import org.apache.hadoop.util.DiskChecker;
|
|
import org.apache.hadoop.util.GenericOptionsParser;
|
|
import org.apache.hadoop.util.GenericOptionsParser;
|
|
@@ -228,16 +229,28 @@ public class DataNode extends Configured
|
|
// For InterDataNodeProtocol
|
|
// For InterDataNodeProtocol
|
|
public Server ipcServer;
|
|
public Server ipcServer;
|
|
|
|
|
|
|
|
+ private SecureResources secureResources = null;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Create the DataNode given a configuration and an array of dataDirs.
|
|
* Create the DataNode given a configuration and an array of dataDirs.
|
|
* 'dataDirs' is where the blocks are stored.
|
|
* 'dataDirs' is where the blocks are stored.
|
|
*/
|
|
*/
|
|
DataNode(final Configuration conf,
|
|
DataNode(final Configuration conf,
|
|
final AbstractList<File> dataDirs) throws IOException {
|
|
final AbstractList<File> dataDirs) throws IOException {
|
|
|
|
+ this(conf, dataDirs, null);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Start a Datanode with specified server sockets for secure environments
|
|
|
|
+ * where they are run with privileged ports and injected from a higher
|
|
|
|
+ * level of capability
|
|
|
|
+ */
|
|
|
|
+ DataNode(final Configuration conf,
|
|
|
|
+ final AbstractList<File> dataDirs, final SecureResources resources) throws IOException {
|
|
this(conf, dataDirs, (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
|
|
this(conf, dataDirs, (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
|
|
DatanodeProtocol.versionID,
|
|
DatanodeProtocol.versionID,
|
|
NameNode.getServiceAddress(conf, true),
|
|
NameNode.getServiceAddress(conf, true),
|
|
- conf));
|
|
|
|
|
|
+ conf), resources);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -246,13 +259,13 @@ public class DataNode extends Configured
|
|
*/
|
|
*/
|
|
DataNode(final Configuration conf,
|
|
DataNode(final Configuration conf,
|
|
final AbstractList<File> dataDirs,
|
|
final AbstractList<File> dataDirs,
|
|
- final DatanodeProtocol namenode) throws IOException {
|
|
|
|
|
|
+ final DatanodeProtocol namenode, final SecureResources resources) throws IOException {
|
|
super(conf);
|
|
super(conf);
|
|
|
|
|
|
DataNode.setDataNode(this);
|
|
DataNode.setDataNode(this);
|
|
|
|
|
|
try {
|
|
try {
|
|
- startDataNode(conf, dataDirs, namenode);
|
|
|
|
|
|
+ startDataNode(conf, dataDirs, namenode, resources);
|
|
} catch (IOException ie) {
|
|
} catch (IOException ie) {
|
|
shutdown();
|
|
shutdown();
|
|
throw ie;
|
|
throw ie;
|
|
@@ -271,8 +284,14 @@ public class DataNode extends Configured
|
|
*/
|
|
*/
|
|
void startDataNode(Configuration conf,
|
|
void startDataNode(Configuration conf,
|
|
AbstractList<File> dataDirs,
|
|
AbstractList<File> dataDirs,
|
|
- DatanodeProtocol namenode
|
|
|
|
|
|
+ DatanodeProtocol namenode, SecureResources resources
|
|
) throws IOException {
|
|
) throws IOException {
|
|
|
|
+ if(UserGroupInformation.isSecurityEnabled() && resources == null)
|
|
|
|
+ throw new RuntimeException("Cannot start secure cluster without " +
|
|
|
|
+ "privileged resources.");
|
|
|
|
+
|
|
|
|
+ this.secureResources = resources;
|
|
|
|
+
|
|
// use configured nameserver & interface to get local hostname
|
|
// use configured nameserver & interface to get local hostname
|
|
if (conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY) != null) {
|
|
if (conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY) != null) {
|
|
machineName = conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY);
|
|
machineName = conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY);
|
|
@@ -294,8 +313,7 @@ public class DataNode extends Configured
|
|
true);
|
|
true);
|
|
this.writePacketSize = conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
|
this.writePacketSize = conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
|
DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
|
|
DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
|
|
- InetSocketAddress socAddr = NetUtils.createSocketAddr(
|
|
|
|
- conf.get("dfs.datanode.address", "0.0.0.0:50010"));
|
|
|
|
|
|
+ InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);
|
|
int tmpPort = socAddr.getPort();
|
|
int tmpPort = socAddr.getPort();
|
|
storage = new DataStorage();
|
|
storage = new DataStorage();
|
|
// construct registration
|
|
// construct registration
|
|
@@ -336,10 +354,15 @@ public class DataNode extends Configured
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
- // find free port
|
|
|
|
- ServerSocket ss = (socketWriteTimeout > 0) ?
|
|
|
|
|
|
+ // find free port or use privileged port provide
|
|
|
|
+ ServerSocket ss;
|
|
|
|
+ if(secureResources == null) {
|
|
|
|
+ ss = (socketWriteTimeout > 0) ?
|
|
ServerSocketChannel.open().socket() : new ServerSocket();
|
|
ServerSocketChannel.open().socket() : new ServerSocket();
|
|
- Server.bind(ss, socAddr, 0);
|
|
|
|
|
|
+ Server.bind(ss, socAddr, 0);
|
|
|
|
+ } else {
|
|
|
|
+ ss = resources.getStreamingSocket();
|
|
|
|
+ }
|
|
ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
|
|
ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
|
|
// adjust machine name with the actual port
|
|
// adjust machine name with the actual port
|
|
tmpPort = ss.getLocalPort();
|
|
tmpPort = ss.getLocalPort();
|
|
@@ -379,12 +402,14 @@ public class DataNode extends Configured
|
|
}
|
|
}
|
|
|
|
|
|
//create a servlet to serve full-file content
|
|
//create a servlet to serve full-file content
|
|
- InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(
|
|
|
|
- conf.get("dfs.datanode.http.address", "0.0.0.0:50075"));
|
|
|
|
|
|
+ InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
|
|
String infoHost = infoSocAddr.getHostName();
|
|
String infoHost = infoSocAddr.getHostName();
|
|
int tmpInfoPort = infoSocAddr.getPort();
|
|
int tmpInfoPort = infoSocAddr.getPort();
|
|
- this.infoServer = new HttpServer("datanode", infoHost, tmpInfoPort,
|
|
|
|
- tmpInfoPort == 0, conf);
|
|
|
|
|
|
+ this.infoServer = (secureResources == null)
|
|
|
|
+ ? new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,
|
|
|
|
+ conf)
|
|
|
|
+ : new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,
|
|
|
|
+ conf, secureResources.getListener());
|
|
LOG.debug("Datanode listening on " + infoHost + ":" + tmpInfoPort);
|
|
LOG.debug("Datanode listening on " + infoHost + ":" + tmpInfoPort);
|
|
if (conf.getBoolean("dfs.https.enable", false)) {
|
|
if (conf.getBoolean("dfs.https.enable", false)) {
|
|
boolean needClientAuth = conf.getBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
|
|
boolean needClientAuth = conf.getBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
|
|
@@ -441,6 +466,14 @@ public class DataNode extends Configured
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Determine the http server's effective addr
|
|
|
|
+ */
|
|
|
|
+ public static InetSocketAddress getInfoAddr(Configuration conf) {
|
|
|
|
+ return NetUtils.createSocketAddr(
|
|
|
|
+ conf.get("dfs.datanode.http.address", "0.0.0.0:50075"));
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Creates either NIO or regular depending on socketWriteTimeout.
|
|
* Creates either NIO or regular depending on socketWriteTimeout.
|
|
*/
|
|
*/
|
|
@@ -1373,6 +1406,15 @@ public class DataNode extends Configured
|
|
*/
|
|
*/
|
|
public static DataNode instantiateDataNode(String args[],
|
|
public static DataNode instantiateDataNode(String args[],
|
|
Configuration conf) throws IOException {
|
|
Configuration conf) throws IOException {
|
|
|
|
+ return instantiateDataNode(args, conf, null);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /** Instantiate a single datanode object, along with its secure resources.
|
|
|
|
+ * This must be run by invoking{@link DataNode#runDatanodeDaemon(DataNode)}
|
|
|
|
+ * subsequently.
|
|
|
|
+ */
|
|
|
|
+ public static DataNode instantiateDataNode(String args [], Configuration conf,
|
|
|
|
+ SecureResources resources) throws IOException {
|
|
if (conf == null)
|
|
if (conf == null)
|
|
conf = new HdfsConfiguration();
|
|
conf = new HdfsConfiguration();
|
|
|
|
|
|
@@ -1394,10 +1436,10 @@ public class DataNode extends Configured
|
|
Collection<URI> dataDirs = getStorageDirs(conf);
|
|
Collection<URI> dataDirs = getStorageDirs(conf);
|
|
dnThreadName = "DataNode: [" +
|
|
dnThreadName = "DataNode: [" +
|
|
StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "]";
|
|
StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "]";
|
|
- UserGroupInformation.setConfiguration(conf);
|
|
|
|
|
|
+ UserGroupInformation.setConfiguration(conf);
|
|
SecurityUtil.login(conf, DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY,
|
|
SecurityUtil.login(conf, DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY,
|
|
DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY);
|
|
DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY);
|
|
- return makeInstance(dataDirs, conf);
|
|
|
|
|
|
+ return makeInstance(dataDirs, conf, resources);
|
|
}
|
|
}
|
|
|
|
|
|
static Collection<URI> getStorageDirs(Configuration conf) {
|
|
static Collection<URI> getStorageDirs(Configuration conf) {
|
|
@@ -1411,7 +1453,16 @@ public class DataNode extends Configured
|
|
*/
|
|
*/
|
|
public static DataNode createDataNode(String args[],
|
|
public static DataNode createDataNode(String args[],
|
|
Configuration conf) throws IOException {
|
|
Configuration conf) throws IOException {
|
|
- DataNode dn = instantiateDataNode(args, conf);
|
|
|
|
|
|
+ return createDataNode(args, conf, null);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /** Instantiate & Start a single datanode daemon and wait for it to finish.
|
|
|
|
+ * If this thread is specifically interrupted, it will stop waiting.
|
|
|
|
+ */
|
|
|
|
+ @InterfaceAudience.Private
|
|
|
|
+ public static DataNode createDataNode(String args[], Configuration conf,
|
|
|
|
+ SecureResources resources) throws IOException {
|
|
|
|
+ DataNode dn = instantiateDataNode(args, conf, resources);
|
|
runDatanodeDaemon(dn);
|
|
runDatanodeDaemon(dn);
|
|
return dn;
|
|
return dn;
|
|
}
|
|
}
|
|
@@ -1431,12 +1482,13 @@ public class DataNode extends Configured
|
|
* @param dataDirs List of directories, where the new DataNode instance should
|
|
* @param dataDirs List of directories, where the new DataNode instance should
|
|
* keep its files.
|
|
* keep its files.
|
|
* @param conf Configuration instance to use.
|
|
* @param conf Configuration instance to use.
|
|
|
|
+ * @param resources Secure resources needed to run under Kerberos
|
|
* @return DataNode instance for given list of data dirs and conf, or null if
|
|
* @return DataNode instance for given list of data dirs and conf, or null if
|
|
* no directory from this directory list can be created.
|
|
* no directory from this directory list can be created.
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- static DataNode makeInstance(Collection<URI> dataDirs, Configuration conf)
|
|
|
|
- throws IOException {
|
|
|
|
|
|
+ static DataNode makeInstance(Collection<URI> dataDirs, Configuration conf,
|
|
|
|
+ SecureResources resources) throws IOException {
|
|
LocalFileSystem localFS = FileSystem.getLocal(conf);
|
|
LocalFileSystem localFS = FileSystem.getLocal(conf);
|
|
FsPermission permission = new FsPermission(
|
|
FsPermission permission = new FsPermission(
|
|
conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
|
|
conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
|
|
@@ -1444,7 +1496,7 @@ public class DataNode extends Configured
|
|
ArrayList<File> dirs = getDataDirsFromURIs(dataDirs, localFS, permission);
|
|
ArrayList<File> dirs = getDataDirsFromURIs(dataDirs, localFS, permission);
|
|
|
|
|
|
if (dirs.size() > 0) {
|
|
if (dirs.size() > 0) {
|
|
- return new DataNode(conf, dirs);
|
|
|
|
|
|
+ return new DataNode(conf, dirs, resources);
|
|
}
|
|
}
|
|
LOG.error("All directories in "
|
|
LOG.error("All directories in "
|
|
+ DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + " are invalid.");
|
|
+ DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + " are invalid.");
|
|
@@ -1551,12 +1603,11 @@ public class DataNode extends Configured
|
|
return data;
|
|
return data;
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- */
|
|
|
|
- public static void main(String args[]) {
|
|
|
|
|
|
+
|
|
|
|
+ public static void secureMain(String args[], SecureResources resources) {
|
|
try {
|
|
try {
|
|
StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
|
|
StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
|
|
- DataNode datanode = createDataNode(args, null);
|
|
|
|
|
|
+ DataNode datanode = createDataNode(args, null, resources);
|
|
if (datanode != null)
|
|
if (datanode != null)
|
|
datanode.join();
|
|
datanode.join();
|
|
} catch (Throwable e) {
|
|
} catch (Throwable e) {
|
|
@@ -1564,6 +1615,10 @@ public class DataNode extends Configured
|
|
System.exit(-1);
|
|
System.exit(-1);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ public static void main(String args[]) {
|
|
|
|
+ secureMain(args, null);
|
|
|
|
+ }
|
|
|
|
|
|
public Daemon recoverBlocks(final Collection<RecoveringBlock> blocks) {
|
|
public Daemon recoverBlocks(final Collection<RecoveringBlock> blocks) {
|
|
Daemon d = new Daemon(threadGroup, new Runnable() {
|
|
Daemon d = new Daemon(threadGroup, new Runnable() {
|
|
@@ -1827,4 +1882,10 @@ public class DataNode extends Configured
|
|
|
|
|
|
return data.getReplicaVisibleLength(block);
|
|
return data.getReplicaVisibleLength(block);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ // Determine a Datanode's streaming address
|
|
|
|
+ public static InetSocketAddress getStreamingAddr(Configuration conf) {
|
|
|
|
+ return NetUtils.createSocketAddr(
|
|
|
|
+ conf.get("dfs.datanode.address", "0.0.0.0:50010"));
|
|
|
|
+ }
|
|
}
|
|
}
|