|
@@ -316,6 +316,7 @@ public class DataNode extends Configured
|
|
|
ServerSocketChannel.open().socket() : new ServerSocket();
|
|
|
Server.bind(ss, socAddr, 0);
|
|
|
ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
|
|
|
+ ss.setSoTimeout(conf.getInt("dfs.dataXceiver.timeoutInMS", 30000)); //30s
|
|
|
// adjust machine name with the actual port
|
|
|
tmpPort = ss.getLocalPort();
|
|
|
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
|
|
@@ -556,6 +557,8 @@ public class DataNode extends Configured
|
|
|
/**
|
|
|
* Shut down this instance of the datanode.
|
|
|
* Returns only after shutdown is complete.
|
|
|
+ * This method can only be called by the offerService thread.
|
|
|
+ * Otherwise, deadlock might occur.
|
|
|
*/
|
|
|
public void shutdown() {
|
|
|
if (infoServer != null) {
|
|
@@ -622,7 +625,8 @@ public class DataNode extends Configured
|
|
|
* when IOException occurs.
|
|
|
* If so, handle the error */
|
|
|
private void checkDiskError( IOException e ) throws IOException {
|
|
|
- if (e.getMessage().startsWith("No space left on device")) {
|
|
|
+ if (e.getMessage() != null &&
|
|
|
+ e.getMessage().startsWith("No space left on device")) {
|
|
|
throw new DiskOutOfSpaceException("No space left on device");
|
|
|
} else {
|
|
|
checkDiskError();
|
|
@@ -640,12 +644,12 @@ public class DataNode extends Configured
|
|
|
|
|
|
private void handleDiskError(String errMsgr) {
|
|
|
LOG.warn("DataNode is shutting down.\n" + errMsgr);
|
|
|
+ shouldRun = false;
|
|
|
try {
|
|
|
namenode.errorReport(
|
|
|
dnRegistration, DatanodeProtocol.DISK_ERROR, errMsgr);
|
|
|
} catch(IOException ignored) {
|
|
|
}
|
|
|
- shutdown();
|
|
|
}
|
|
|
|
|
|
/** Number of concurrent xceivers per node. */
|
|
@@ -834,7 +838,9 @@ public class DataNode extends Configured
|
|
|
return false;
|
|
|
case DatanodeProtocol.DNA_REGISTER:
|
|
|
// namenode requested a registration - at start or if NN lost contact
|
|
|
- register();
|
|
|
+ if (shouldRun) {
|
|
|
+ register();
|
|
|
+ }
|
|
|
break;
|
|
|
case DatanodeProtocol.DNA_FINALIZE:
|
|
|
storage.finalizeUpgrade();
|
|
@@ -984,6 +990,8 @@ public class DataNode extends Configured
|
|
|
Socket s = ss.accept();
|
|
|
s.setTcpNoDelay(true);
|
|
|
new Daemon(threadGroup, new DataXceiver(s)).start();
|
|
|
+ } catch (SocketTimeoutException ignored) {
|
|
|
+ // wake up to see if should continue to run
|
|
|
} catch (IOException ie) {
|
|
|
LOG.warn(dnRegistration + ":DataXceiveServer: "
|
|
|
+ StringUtils.stringifyException(ie));
|
|
@@ -2373,6 +2381,11 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
} catch(IOException ioe) {
|
|
|
IOUtils.closeStream(this);
|
|
|
+ IOException cause = FSDataset.getCauseIfDiskError(ioe);
|
|
|
+ if (cause != null) { // possible disk error
|
|
|
+ ioe = cause;
|
|
|
+ checkDiskError(ioe);
|
|
|
+ }
|
|
|
throw ioe;
|
|
|
}
|
|
|
}
|
|
@@ -2965,6 +2978,11 @@ public class DataNode extends Configured
|
|
|
dn.dataNodeThread.start();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /** check if a datanode is up */
|
|
|
+ static boolean isDatanodeUp(DataNode dn) {
|
|
|
+ return dn.dataNodeThread != null && dn.dataNodeThread.isAlive();
|
|
|
+ }
|
|
|
|
|
|
/** Instantiate a single datanode object. This must be run by invoking
|
|
|
* {@link DataNode#runDatanodeDaemon(DataNode)} subsequently.
|