|
@@ -297,6 +297,7 @@ public class DataNode extends Configured
|
|
ServerSocketChannel.open().socket() : new ServerSocket();
|
|
ServerSocketChannel.open().socket() : new ServerSocket();
|
|
Server.bind(ss, socAddr, 0);
|
|
Server.bind(ss, socAddr, 0);
|
|
ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
|
|
ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
|
|
|
|
+ ss.setSoTimeout(conf.getInt("dfs.dataXceiver.timeoutInMS", 30000)); //30s
|
|
// adjust machine name with the actual port
|
|
// adjust machine name with the actual port
|
|
tmpPort = ss.getLocalPort();
|
|
tmpPort = ss.getLocalPort();
|
|
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
|
|
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
|
|
@@ -538,6 +539,8 @@ public class DataNode extends Configured
|
|
/**
|
|
/**
|
|
* Shut down this instance of the datanode.
|
|
* Shut down this instance of the datanode.
|
|
* Returns only after shutdown is complete.
|
|
* Returns only after shutdown is complete.
|
|
|
|
+ * This method can only be called by the offerService thread.
|
|
|
|
+ * Otherwise, deadlock might occur.
|
|
*/
|
|
*/
|
|
public void shutdown() {
|
|
public void shutdown() {
|
|
if (infoServer != null) {
|
|
if (infoServer != null) {
|
|
@@ -605,7 +608,8 @@ public class DataNode extends Configured
|
|
* when IOException occurs.
|
|
* when IOException occurs.
|
|
* If so, handle the error */
|
|
* If so, handle the error */
|
|
protected void checkDiskError( IOException e ) throws IOException {
|
|
protected void checkDiskError( IOException e ) throws IOException {
|
|
- if (e.getMessage().startsWith("No space left on device")) {
|
|
|
|
|
|
+ if (e.getMessage() != null &&
|
|
|
|
+ e.getMessage().startsWith("No space left on device")) {
|
|
throw new DiskOutOfSpaceException("No space left on device");
|
|
throw new DiskOutOfSpaceException("No space left on device");
|
|
} else {
|
|
} else {
|
|
checkDiskError();
|
|
checkDiskError();
|
|
@@ -623,12 +627,12 @@ public class DataNode extends Configured
|
|
|
|
|
|
private void handleDiskError(String errMsgr) {
|
|
private void handleDiskError(String errMsgr) {
|
|
LOG.warn("DataNode is shutting down.\n" + errMsgr);
|
|
LOG.warn("DataNode is shutting down.\n" + errMsgr);
|
|
|
|
+ shouldRun = false;
|
|
try {
|
|
try {
|
|
namenode.errorReport(
|
|
namenode.errorReport(
|
|
dnRegistration, DatanodeProtocol.DISK_ERROR, errMsgr);
|
|
dnRegistration, DatanodeProtocol.DISK_ERROR, errMsgr);
|
|
} catch(IOException ignored) {
|
|
} catch(IOException ignored) {
|
|
}
|
|
}
|
|
- shutdown();
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/** Number of concurrent xceivers per node. */
|
|
/** Number of concurrent xceivers per node. */
|
|
@@ -818,7 +822,9 @@ public class DataNode extends Configured
|
|
case DatanodeProtocol.DNA_REGISTER:
|
|
case DatanodeProtocol.DNA_REGISTER:
|
|
// namenode requested a registration - at start or if NN lost contact
|
|
// namenode requested a registration - at start or if NN lost contact
|
|
LOG.info("DatanodeCommand action: DNA_REGISTER");
|
|
LOG.info("DatanodeCommand action: DNA_REGISTER");
|
|
- register();
|
|
|
|
|
|
+ if (shouldRun) {
|
|
|
|
+ register();
|
|
|
|
+ }
|
|
break;
|
|
break;
|
|
case DatanodeProtocol.DNA_FINALIZE:
|
|
case DatanodeProtocol.DNA_FINALIZE:
|
|
storage.finalizeUpgrade();
|
|
storage.finalizeUpgrade();
|
|
@@ -1140,6 +1146,10 @@ public class DataNode extends Configured
|
|
dn.dataNodeThread.start();
|
|
dn.dataNodeThread.start();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ static boolean isDatanodeUp(DataNode dn) {
|
|
|
|
+ return dn.dataNodeThread != null && dn.dataNodeThread.isAlive();
|
|
|
|
+ }
|
|
|
|
|
|
/** Instantiate a single datanode object. This must be run by invoking
|
|
/** Instantiate a single datanode object. This must be run by invoking
|
|
* {@link DataNode#runDatanodeDaemon(DataNode)} subsequently.
|
|
* {@link DataNode#runDatanodeDaemon(DataNode)} subsequently.
|