|
@@ -98,6 +98,7 @@ import org.apache.hadoop.util.ServicePlugin;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
|
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
|
|
|
+import org.apache.hadoop.util.Service;
|
|
|
|
|
|
/**********************************************************
|
|
/**********************************************************
|
|
* DataNode is a class (and program) that stores a set of
|
|
* DataNode is a class (and program) that stores a set of
|
|
@@ -130,7 +131,7 @@ import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
|
* information to clients or other DataNodes that might be interested.
|
|
* information to clients or other DataNodes that might be interested.
|
|
*
|
|
*
|
|
**********************************************************/
|
|
**********************************************************/
|
|
-public class DataNode extends Configured
|
|
|
|
|
|
+public class DataNode extends Service
|
|
implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {
|
|
implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {
|
|
public static final Log LOG = LogFactory.getLog(DataNode.class);
|
|
public static final Log LOG = LogFactory.getLog(DataNode.class);
|
|
|
|
|
|
@@ -204,7 +205,8 @@ public class DataNode extends Configured
|
|
|
|
|
|
/** Activated plug-ins. */
|
|
/** Activated plug-ins. */
|
|
private List<ServicePlugin> plugins;
|
|
private List<ServicePlugin> plugins;
|
|
-
|
|
|
|
|
|
+ /** data directories */
|
|
|
|
+ private AbstractList<File> dataDirs;
|
|
private static final Random R = new Random();
|
|
private static final Random R = new Random();
|
|
|
|
|
|
// For InterDataNodeProtocol
|
|
// For InterDataNodeProtocol
|
|
@@ -221,19 +223,55 @@ public class DataNode extends Configured
|
|
/**
|
|
/**
|
|
* Create the DataNode given a configuration and an array of dataDirs.
|
|
* Create the DataNode given a configuration and an array of dataDirs.
|
|
* 'dataDirs' is where the blocks are stored.
|
|
* 'dataDirs' is where the blocks are stored.
|
|
|
|
+ * <p>?
|
|
|
|
+ * Important: this constructor does not start
|
|
|
|
+ * the node, merely initializes it
|
|
|
|
+ *
|
|
|
|
+ * @param conf configuration to use
|
|
|
|
+ * @param dataDirs list of directories that may be used for data
|
|
|
|
+ * @throws IOException for historical reasons
|
|
*/
|
|
*/
|
|
- DataNode(Configuration conf,
|
|
|
|
|
|
+ DataNode(Configuration conf,
|
|
AbstractList<File> dataDirs) throws IOException {
|
|
AbstractList<File> dataDirs) throws IOException {
|
|
super(conf);
|
|
super(conf);
|
|
DataNode.setDataNode(this);
|
|
DataNode.setDataNode(this);
|
|
- try {
|
|
|
|
- startDataNode(conf, dataDirs);
|
|
|
|
- } catch (IOException ie) {
|
|
|
|
- shutdown();
|
|
|
|
- throw ie;
|
|
|
|
- }
|
|
|
|
|
|
+ this.dataDirs = dataDirs;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+/////////////////////////////////////////////////////
|
|
|
|
+// Lifecycle
|
|
|
|
+/////////////////////////////////////////////////////
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Start any work (in separate threads)
|
|
|
|
+ *
|
|
|
|
+ * @throws IOException for any startup failure
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public void innerStart() throws IOException {
|
|
|
|
+ startDataNode(getConf(), dataDirs);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * {@inheritDoc}.
|
|
|
|
+ *
|
|
|
|
+ * This implementation checks for the IPC server running and the
|
|
|
|
+ * DataNode being registered to a namenode.
|
|
|
|
+ *
|
|
|
|
+ * @param status the initial status
|
|
|
|
+ * @throws IOException for any ping failure
|
|
|
|
+ * @throws LivenessException if the IPC server is not defined
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public void innerPing(ServiceStatus status) throws IOException {
|
|
|
|
+ if (ipcServer == null) {
|
|
|
|
+ status.addThrowable(new LivenessException("No IPC Server running"));
|
|
|
|
+ }
|
|
|
|
+ if (dnRegistration == null) {
|
|
|
|
+ status.addThrowable(
|
|
|
|
+ new LivenessException("Not registered to a namenode"));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
/**
|
|
* This method starts the data node with the specified conf.
|
|
* This method starts the data node with the specified conf.
|
|
@@ -362,6 +400,9 @@ public class DataNode extends Configured
|
|
int tmpInfoPort = infoSocAddr.getPort();
|
|
int tmpInfoPort = infoSocAddr.getPort();
|
|
this.infoServer = new HttpServer("datanode", infoHost, tmpInfoPort,
|
|
this.infoServer = new HttpServer("datanode", infoHost, tmpInfoPort,
|
|
tmpInfoPort == 0, conf);
|
|
tmpInfoPort == 0, conf);
|
|
|
|
+ if(LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Datanode listening on " + infoHost + ":" + tmpInfoPort);
|
|
|
|
+ }
|
|
if (conf.getBoolean("dfs.https.enable", false)) {
|
|
if (conf.getBoolean("dfs.https.enable", false)) {
|
|
boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
|
|
boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
|
|
InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
|
|
InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
|
|
@@ -369,6 +410,9 @@ public class DataNode extends Configured
|
|
Configuration sslConf = new Configuration(false);
|
|
Configuration sslConf = new Configuration(false);
|
|
sslConf.addResource(conf.get("dfs.https.server.keystore.resource",
|
|
sslConf.addResource(conf.get("dfs.https.server.keystore.resource",
|
|
"ssl-server.xml"));
|
|
"ssl-server.xml"));
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Datanode listening for SSL on " + secInfoSocAddr);
|
|
|
|
+ }
|
|
this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
|
|
this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
|
|
}
|
|
}
|
|
this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
|
|
this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
|
|
@@ -435,6 +479,10 @@ public class DataNode extends Configured
|
|
} catch (InterruptedException ie) {}
|
|
} catch (InterruptedException ie) {}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ if(!shouldRun) {
|
|
|
|
+ throw new IOException("Datanode shut down during handshake with NameNode "
|
|
|
|
+ + getNameNodeAddr());
|
|
|
|
+ }
|
|
String errorMsg = null;
|
|
String errorMsg = null;
|
|
// verify build version
|
|
// verify build version
|
|
if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
|
|
if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
|
|
@@ -544,10 +592,14 @@ public class DataNode extends Configured
|
|
* @see FSNamesystem#registerDatanode(DatanodeRegistration)
|
|
* @see FSNamesystem#registerDatanode(DatanodeRegistration)
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- private void register() throws IOException {
|
|
|
|
|
|
+ protected void register() throws IOException {
|
|
if (dnRegistration.getStorageID().equals("")) {
|
|
if (dnRegistration.getStorageID().equals("")) {
|
|
setNewStorageID(dnRegistration);
|
|
setNewStorageID(dnRegistration);
|
|
}
|
|
}
|
|
|
|
+ //if we are LIVE, move into the STARTED state, as registration implies that
|
|
|
|
+ //the node is no longer LIVE
|
|
|
|
+ enterState(ServiceState.LIVE, ServiceState.STARTED);
|
|
|
|
+ //spin until the server is up.
|
|
while(shouldRun) {
|
|
while(shouldRun) {
|
|
try {
|
|
try {
|
|
// reset name to machineName. Mainly for web interface.
|
|
// reset name to machineName. Mainly for web interface.
|
|
@@ -598,39 +650,55 @@ public class DataNode extends Configured
|
|
dnRegistration.exportedKeys = ExportedAccessKeys.DUMMY_KEYS;
|
|
dnRegistration.exportedKeys = ExportedAccessKeys.DUMMY_KEYS;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ //at this point the DataNode now considers itself live.
|
|
|
|
+ enterLiveState();
|
|
// random short delay - helps scatter the BR from all DNs
|
|
// random short delay - helps scatter the BR from all DNs
|
|
scheduleBlockReport(initialBlockReportDelay);
|
|
scheduleBlockReport(initialBlockReportDelay);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Shut down this instance of the datanode. Returns only after shutdown is
|
|
|
|
+ * complete.
|
|
|
|
+ */
|
|
|
|
+ public void shutdown() {
|
|
|
|
+ closeQuietly();
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Shut down this instance of the datanode.
|
|
* Shut down this instance of the datanode.
|
|
* Returns only after shutdown is complete.
|
|
* Returns only after shutdown is complete.
|
|
* This method can only be called by the offerService thread.
|
|
* This method can only be called by the offerService thread.
|
|
* Otherwise, deadlock might occur.
|
|
* Otherwise, deadlock might occur.
|
|
*/
|
|
*/
|
|
- public void shutdown() {
|
|
|
|
- if (plugins != null) {
|
|
|
|
- for (ServicePlugin p : plugins) {
|
|
|
|
|
|
+ @Override
|
|
|
|
+ protected void innerClose() throws IOException {
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ //disable the should run flag first, so that everything out there starts
|
|
|
|
+ //to shut down
|
|
|
|
+ shouldRun = false;
|
|
|
|
+ if (plugins != null) {
|
|
|
|
+ for (ServicePlugin p : plugins) {
|
|
|
|
+ try {
|
|
|
|
+ p.stop();
|
|
|
|
+ LOG.info("Stopped plug-in " + p);
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ LOG.warn("ServicePlugin " + p + " could not be stopped", t);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (infoServer != null) {
|
|
try {
|
|
try {
|
|
- p.stop();
|
|
|
|
- LOG.info("Stopped plug-in " + p);
|
|
|
|
- } catch (Throwable t) {
|
|
|
|
- LOG.warn("ServicePlugin " + p + " could not be stopped", t);
|
|
|
|
|
|
+ infoServer.stop();
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ LOG.warn("Exception shutting down DataNode", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- }
|
|
|
|
-
|
|
|
|
- if (infoServer != null) {
|
|
|
|
- try {
|
|
|
|
- infoServer.stop();
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- LOG.warn("Exception shutting down DataNode", e);
|
|
|
|
|
|
+ if (ipcServer != null) {
|
|
|
|
+ ipcServer.stop();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- if (ipcServer != null) {
|
|
|
|
- ipcServer.stop();
|
|
|
|
- }
|
|
|
|
- this.shouldRun = false;
|
|
|
|
if (dataXceiverServer != null) {
|
|
if (dataXceiverServer != null) {
|
|
((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
|
|
((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
|
|
this.dataXceiverServer.interrupt();
|
|
this.dataXceiverServer.interrupt();
|
|
@@ -676,6 +744,8 @@ public class DataNode extends Configured
|
|
try {
|
|
try {
|
|
this.storage.unlockAll();
|
|
this.storage.unlockAll();
|
|
} catch (IOException ie) {
|
|
} catch (IOException ie) {
|
|
|
|
+ LOG.warn("Ignoring exception when unlocking storage: " + ie,
|
|
|
|
+ ie);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if (dataNodeThread != null) {
|
|
if (dataNodeThread != null) {
|
|
@@ -1265,7 +1335,9 @@ public class DataNode extends Configured
|
|
startDistributedUpgradeIfNeeded();
|
|
startDistributedUpgradeIfNeeded();
|
|
offerService();
|
|
offerService();
|
|
} catch (Exception ex) {
|
|
} catch (Exception ex) {
|
|
- LOG.error("Exception: " + StringUtils.stringifyException(ex));
|
|
|
|
|
|
+ LOG.error("Exception while in state " + getServiceState()
|
|
|
|
+ + " and shouldRun=" + shouldRun + ": " + ex,
|
|
|
|
+ ex);
|
|
if (shouldRun) {
|
|
if (shouldRun) {
|
|
try {
|
|
try {
|
|
Thread.sleep(5000);
|
|
Thread.sleep(5000);
|
|
@@ -1357,28 +1429,46 @@ public class DataNode extends Configured
|
|
public static DataNode makeInstance(String[] dataDirs, Configuration conf)
|
|
public static DataNode makeInstance(String[] dataDirs, Configuration conf)
|
|
throws IOException {
|
|
throws IOException {
|
|
ArrayList<File> dirs = new ArrayList<File>();
|
|
ArrayList<File> dirs = new ArrayList<File>();
|
|
- for (int i = 0; i < dataDirs.length; i++) {
|
|
|
|
- File data = new File(dataDirs[i]);
|
|
|
|
|
|
+ StringBuffer invalid = new StringBuffer();
|
|
|
|
+ for (String dataDir : dataDirs) {
|
|
|
|
+ File data = new File(dataDir);
|
|
try {
|
|
try {
|
|
DiskChecker.checkDir(data);
|
|
DiskChecker.checkDir(data);
|
|
dirs.add(data);
|
|
dirs.add(data);
|
|
} catch(DiskErrorException e) {
|
|
} catch(DiskErrorException e) {
|
|
- LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage());
|
|
|
|
|
|
+ LOG.warn("Invalid directory in dfs.data.dir: " + e, e);
|
|
|
|
+ invalid.append(dataDir);
|
|
|
|
+ invalid.append(" ");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- if (dirs.size() > 0)
|
|
|
|
- return new DataNode(conf, dirs);
|
|
|
|
- LOG.error("All directories in dfs.data.dir are invalid.");
|
|
|
|
- return null;
|
|
|
|
|
|
+ if (dirs.size() > 0) {
|
|
|
|
+ DataNode dataNode = new DataNode(conf, dirs);
|
|
|
|
+ Service.startService(dataNode);
|
|
|
|
+ return dataNode;
|
|
|
|
+ } else {
|
|
|
|
+ LOG.error("All directories in dfs.data.dir are invalid: " + invalid);
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * {@inheritDoc}
|
|
|
|
+ *
|
|
|
|
+ * @return the name of this service
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public String getServiceName() {
|
|
|
|
+ return "DataNode";
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public String toString() {
|
|
public String toString() {
|
|
- return "DataNode{" +
|
|
|
|
|
|
+ return getServiceName() + " {" +
|
|
"data=" + data +
|
|
"data=" + data +
|
|
", localName='" + dnRegistration.getName() + "'" +
|
|
", localName='" + dnRegistration.getName() + "'" +
|
|
", storageID='" + dnRegistration.getStorageID() + "'" +
|
|
", storageID='" + dnRegistration.getStorageID() + "'" +
|
|
", xmitsInProgress=" + xmitsInProgress.get() +
|
|
", xmitsInProgress=" + xmitsInProgress.get() +
|
|
|
|
+ ", state=" + getServiceState() +
|
|
"}";
|
|
"}";
|
|
}
|
|
}
|
|
|
|
|