|
@@ -286,26 +286,7 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * This method starts the data node with the specified conf.
|
|
|
- *
|
|
|
- * @param conf - the configuration
|
|
|
- * if conf's CONFIG_PROPERTY_SIMULATED property is set
|
|
|
- * then a simulated storage based data node is created.
|
|
|
- *
|
|
|
- * @param dataDirs - only for a non-simulated storage data node
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- void startDataNode(Configuration conf,
|
|
|
- AbstractList<File> dataDirs,
|
|
|
- DatanodeProtocol namenode, SecureResources resources
|
|
|
- ) throws IOException {
|
|
|
- if(UserGroupInformation.isSecurityEnabled() && resources == null)
|
|
|
- throw new RuntimeException("Cannot start secure cluster without " +
|
|
|
- "privileged resources.");
|
|
|
-
|
|
|
- this.secureResources = resources;
|
|
|
-
|
|
|
+ private void initConfig(Configuration conf) throws UnknownHostException {
|
|
|
// use configured nameserver & interface to get local hostname
|
|
|
if (conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY) != null) {
|
|
|
machineName = conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY);
|
|
@@ -328,72 +309,6 @@ public class DataNode extends Configured
|
|
|
true);
|
|
|
this.writePacketSize = conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
|
|
DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
|
|
|
- InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);
|
|
|
- int tmpPort = socAddr.getPort();
|
|
|
- storage = new DataStorage();
|
|
|
- // construct registration
|
|
|
- this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);
|
|
|
-
|
|
|
- // connect to name node
|
|
|
- this.namenode = namenode;
|
|
|
-
|
|
|
- // get version and id info from the name-node
|
|
|
- NamespaceInfo nsInfo = handshake();
|
|
|
- StartupOption startOpt = getStartupOption(conf);
|
|
|
- assert startOpt != null : "Startup option must be set.";
|
|
|
-
|
|
|
- boolean simulatedFSDataset =
|
|
|
- conf.getBoolean("dfs.datanode.simulateddatastorage", false);
|
|
|
- if (simulatedFSDataset) {
|
|
|
- setNewStorageID(dnRegistration);
|
|
|
- dnRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
|
|
|
- dnRegistration.storageInfo.namespaceID = nsInfo.namespaceID;
|
|
|
- dnRegistration.storageInfo.clusterID = nsInfo.clusterID;
|
|
|
- dnRegistration.storageInfo.blockpoolID = nsInfo.blockpoolID;
|
|
|
- // it would have been better to pass storage as a parameter to
|
|
|
- // constructor below - need to augment ReflectionUtils used below.
|
|
|
- conf.set(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY, dnRegistration.getStorageID());
|
|
|
- try {
|
|
|
- //Equivalent of following (can't do because Simulated is in test dir)
|
|
|
- // this.data = new SimulatedFSDataset(conf);
|
|
|
- this.data = (FSDatasetInterface) ReflectionUtils.newInstance(
|
|
|
- Class.forName("org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"), conf);
|
|
|
- } catch (ClassNotFoundException e) {
|
|
|
- throw new IOException(StringUtils.stringifyException(e));
|
|
|
- }
|
|
|
- } else { // real storage
|
|
|
- // read storage info, lock data dirs and transition fs state if necessary
|
|
|
- storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
|
|
|
- // adjust
|
|
|
- this.dnRegistration.setStorageInfo(storage);
|
|
|
- // initialize data node internal structure
|
|
|
- this.data = new FSDataset(storage, conf);
|
|
|
- }
|
|
|
-
|
|
|
- // register datanode MXBean
|
|
|
- registerMXBean();
|
|
|
-
|
|
|
- // find free port or use privileged port provide
|
|
|
- ServerSocket ss;
|
|
|
- if(secureResources == null) {
|
|
|
- ss = (socketWriteTimeout > 0) ?
|
|
|
- ServerSocketChannel.open().socket() : new ServerSocket();
|
|
|
- Server.bind(ss, socAddr, 0);
|
|
|
- } else {
|
|
|
- ss = resources.getStreamingSocket();
|
|
|
- }
|
|
|
- ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
|
|
|
- // adjust machine name with the actual port
|
|
|
- tmpPort = ss.getLocalPort();
|
|
|
- selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
|
|
|
- tmpPort);
|
|
|
- this.dnRegistration.setName(machineName + ":" + tmpPort);
|
|
|
- LOG.info("Opened info server at " + tmpPort);
|
|
|
-
|
|
|
- this.threadGroup = new ThreadGroup("dataXceiverServer");
|
|
|
- this.dataXceiverServer = new Daemon(threadGroup,
|
|
|
- new DataXceiverServer(ss, conf, this));
|
|
|
- this.threadGroup.setDaemon(true); // auto destroy when empty
|
|
|
|
|
|
this.blockReportInterval =
|
|
|
conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
|
|
@@ -405,22 +320,10 @@ public class DataNode extends Configured
|
|
|
"dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
|
|
|
}
|
|
|
this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
|
|
|
-
|
|
|
- //initialize periodic block scanner
|
|
|
- String reason = null;
|
|
|
- if (conf.getInt("dfs.datanode.scan.period.hours", 0) < 0) {
|
|
|
- reason = "verification is turned off by configuration";
|
|
|
- } else if ( !(data instanceof FSDataset) ) {
|
|
|
- reason = "verifcation is supported only with FSDataset";
|
|
|
- }
|
|
|
- if ( reason == null ) {
|
|
|
- blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);
|
|
|
- } else {
|
|
|
- LOG.info("Periodic Block Verification is disabled because " +
|
|
|
- reason + ".");
|
|
|
- }
|
|
|
-
|
|
|
- //create a servlet to serve full-file content
|
|
|
+ }
|
|
|
+
|
|
|
+ private void startInfoServer(Configuration conf) throws IOException {
|
|
|
+ // create a servlet to serve full-file content
|
|
|
InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
|
|
|
String infoHost = infoSocAddr.getHostName();
|
|
|
int tmpInfoPort = infoSocAddr.getPort();
|
|
@@ -456,19 +359,67 @@ public class DataNode extends Configured
|
|
|
this.infoServer.start();
|
|
|
// adjust info port
|
|
|
this.dnRegistration.setInfoPort(this.infoServer.getPort());
|
|
|
- myMetrics = new DataNodeMetrics(conf, dnRegistration.getName());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void initFsDataSet(Configuration conf, AbstractList<File> dataDirs)
|
|
|
+ throws IOException {
|
|
|
+ // get version and id info from the name-node
|
|
|
+ NamespaceInfo nsInfo = handshake();
|
|
|
|
|
|
- // BlockTokenSecretManager is created here, but it shouldn't be
|
|
|
- // used until it is initialized in register().
|
|
|
- this.blockTokenSecretManager = new BlockTokenSecretManager(false, 0, 0);
|
|
|
+ StartupOption startOpt = getStartupOption(conf);
|
|
|
+ assert startOpt != null : "Startup option must be set.";
|
|
|
|
|
|
- //init ipc server
|
|
|
+
|
|
|
+ boolean simulatedFSDataset =
|
|
|
+ conf.getBoolean("dfs.datanode.simulateddatastorage", false);
|
|
|
+ if (simulatedFSDataset) {
|
|
|
+ setNewStorageID(dnRegistration);
|
|
|
+ dnRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
|
|
|
+ dnRegistration.storageInfo.namespaceID = nsInfo.namespaceID;
|
|
|
+ dnRegistration.storageInfo.clusterID = nsInfo.clusterID;
|
|
|
+ dnRegistration.storageInfo.blockpoolID = nsInfo.blockpoolID;
|
|
|
+ // it would have been better to pass storage as a parameter to
|
|
|
+ // constructor below - need to augment ReflectionUtils used below.
|
|
|
+ conf.set(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY, dnRegistration.getStorageID());
|
|
|
+ try {
|
|
|
+ //Equivalent of following (can't do because Simulated is in test dir)
|
|
|
+ // this.data = new SimulatedFSDataset(conf);
|
|
|
+ this.data = (FSDatasetInterface) ReflectionUtils.newInstance(
|
|
|
+ Class.forName("org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"), conf);
|
|
|
+ } catch (ClassNotFoundException e) {
|
|
|
+ throw new IOException(StringUtils.stringifyException(e));
|
|
|
+ }
|
|
|
+ } else { // real storage
|
|
|
+ // read storage info, lock data dirs and transition fs state if necessary
|
|
|
+ storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
|
|
|
+ // adjust
|
|
|
+ this.dnRegistration.setStorageInfo(storage);
|
|
|
+ // initialize data node internal structure
|
|
|
+ this.data = new FSDataset(storage, conf);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private void startPlugins(Configuration conf) {
|
|
|
+ plugins = conf.getInstances("dfs.datanode.plugins", ServicePlugin.class);
|
|
|
+ for (ServicePlugin p: plugins) {
|
|
|
+ try {
|
|
|
+ p.start(this);
|
|
|
+ LOG.info("Started plug-in " + p);
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.warn("ServicePlugin " + p + " could not be started", t);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private void initIpcServer(Configuration conf) throws IOException {
|
|
|
InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
|
|
|
conf.get("dfs.datanode.ipc.address"));
|
|
|
ipcServer = RPC.getServer(DataNode.class, this, ipcAddr.getHostName(),
|
|
|
ipcAddr.getPort(), conf.getInt("dfs.datanode.handler.count", 3), false,
|
|
|
conf, blockTokenSecretManager);
|
|
|
-
|
|
|
+
|
|
|
// set service-level authorization security policy
|
|
|
if (conf.getBoolean(
|
|
|
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
|
|
@@ -476,19 +427,93 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
|
|
|
dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
|
|
|
-
|
|
|
LOG.info("dnRegistration = " + dnRegistration);
|
|
|
-
|
|
|
- plugins = conf.getInstances("dfs.datanode.plugins", ServicePlugin.class);
|
|
|
- for (ServicePlugin p: plugins) {
|
|
|
- try {
|
|
|
- p.start(this);
|
|
|
- LOG.info("Started plug-in " + p);
|
|
|
- } catch (Throwable t) {
|
|
|
- LOG.warn("ServicePlugin " + p + " could not be started", t);
|
|
|
- }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private void initBlockScanner(Configuration conf) {
|
|
|
+ String reason = null;
|
|
|
+ if (conf.getInt("dfs.datanode.scan.period.hours", 0) < 0) {
|
|
|
+ reason = "verification is turned off by configuration";
|
|
|
+ } else if ( !(data instanceof FSDataset) ) {
|
|
|
+ reason = "verifcation is supported only with FSDataset";
|
|
|
+ }
|
|
|
+ if ( reason == null ) {
|
|
|
+ blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);
|
|
|
+ } else {
|
|
|
+ LOG.info("Periodic Block Verification is disabled because " +
|
|
|
+ reason + ".");
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private void initDataXceiver(Configuration conf) throws IOException {
|
|
|
+ // construct registration
|
|
|
+ InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);
|
|
|
+ int tmpPort = socAddr.getPort();
|
|
|
+ this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);
|
|
|
+
|
|
|
+ // find free port or use privileged port provided
|
|
|
+ ServerSocket ss;
|
|
|
+ if(secureResources == null) {
|
|
|
+ ss = (socketWriteTimeout > 0) ?
|
|
|
+ ServerSocketChannel.open().socket() : new ServerSocket();
|
|
|
+ Server.bind(ss, socAddr, 0);
|
|
|
+ } else {
|
|
|
+ ss = secureResources.getStreamingSocket();
|
|
|
+ }
|
|
|
+ ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
|
|
|
+ // adjust machine name with the actual port
|
|
|
+ tmpPort = ss.getLocalPort();
|
|
|
+ selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
|
|
|
+ tmpPort);
|
|
|
+ this.dnRegistration.setName(machineName + ":" + tmpPort);
|
|
|
+ LOG.info("Opened info server at " + tmpPort);
|
|
|
+
|
|
|
+ this.threadGroup = new ThreadGroup("dataXceiverServer");
|
|
|
+ this.dataXceiverServer = new Daemon(threadGroup,
|
|
|
+ new DataXceiverServer(ss, conf, this));
|
|
|
+ this.threadGroup.setDaemon(true); // auto destroy when empty
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This method starts the data node with the specified conf.
|
|
|
+ *
|
|
|
+ * @param conf - the configuration
|
|
|
+ * if conf's CONFIG_PROPERTY_SIMULATED property is set
|
|
|
+ * then a simulated storage based data node is created.
|
|
|
+ *
|
|
|
+ * @param dataDirs - only for a non-simulated storage data node
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ void startDataNode(Configuration conf,
|
|
|
+ AbstractList<File> dataDirs,
|
|
|
+ DatanodeProtocol namenode, SecureResources resources
|
|
|
+ ) throws IOException {
|
|
|
+ if(UserGroupInformation.isSecurityEnabled() && resources == null)
|
|
|
+ throw new RuntimeException("Cannot start secure cluster without " +
|
|
|
+ "privileged resources.");
|
|
|
+
|
|
|
+ this.secureResources = resources;
|
|
|
+ this.namenode = namenode;
|
|
|
+ storage = new DataStorage();
|
|
|
+
|
|
|
+ initConfig(conf);
|
|
|
+ registerMXBean();
|
|
|
+ initDataXceiver(conf);
|
|
|
+ initFsDataSet(conf, dataDirs);
|
|
|
+ initBlockScanner(conf);
|
|
|
+ startInfoServer(conf);
|
|
|
+
|
|
|
+ myMetrics = new DataNodeMetrics(conf, dnRegistration.getName());
|
|
|
+ // TODO check what code removed here
|
|
|
+
|
|
|
+ initIpcServer(conf);
|
|
|
+ startPlugins(conf);
|
|
|
+
|
|
|
+ // BlockTokenSecretManager is created here, but it shouldn't be
|
|
|
+ // used until it is initialized in register().
|
|
|
+ this.blockTokenSecretManager = new BlockTokenSecretManager(false, 0, 0);
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Determine the http server's effective addr
|