|
@@ -87,7 +87,6 @@ import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
import org.apache.hadoop.util.*;
|
|
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|
|
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
|
|
import org.mortbay.util.ajax.JSON;
|
|
|
|
|
|
import javax.management.ObjectName;
|
|
@@ -95,8 +94,6 @@ import javax.management.ObjectName;
|
|
|
import java.io.*;
|
|
|
import java.lang.management.ManagementFactory;
|
|
|
import java.net.*;
|
|
|
-import java.nio.channels.ClosedByInterruptException;
|
|
|
-import java.nio.channels.ClosedChannelException;
|
|
|
import java.nio.channels.SocketChannel;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.*;
|
|
@@ -232,6 +229,11 @@ public class DataNode extends Configured
|
|
|
ReadaheadPool readaheadPool;
|
|
|
private final boolean getHdfsBlockLocationsEnabled;
|
|
|
private ObjectName dataNodeInfoBeanName;
|
|
|
+ private Thread checkDiskErrorThread = null;
|
|
|
+ protected final int checkDiskErrorInterval = 5*1000;
|
|
|
+ private boolean checkDiskErrorFlag = false;
|
|
|
+ private Object checkDiskErrorMutex = new Object();
|
|
|
+ private long lastDiskErrorCheck;
|
|
|
|
|
|
/**
|
|
|
* Create the DataNode given a configuration, an array of dataDirs,
|
|
@@ -241,6 +243,7 @@ public class DataNode extends Configured
|
|
|
final List<StorageLocation> dataDirs,
|
|
|
final SecureResources resources) throws IOException {
|
|
|
super(conf);
|
|
|
+ this.lastDiskErrorCheck = 0;
|
|
|
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
|
|
|
DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
|
|
|
|
|
@@ -1219,6 +1222,11 @@ public class DataNode extends Configured
|
|
|
this.dataXceiverServer.interrupt();
|
|
|
}
|
|
|
|
|
|
+ // Interrupt the checkDiskErrorThread and terminate it.
|
|
|
+ if(this.checkDiskErrorThread != null) {
|
|
|
+ this.checkDiskErrorThread.interrupt();
|
|
|
+ }
|
|
|
+
|
|
|
// Record the time of initial notification
|
|
|
long timeNotified = Time.now();
|
|
|
|
|
@@ -1328,55 +1336,17 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
|
|
|
|
|
|
- /** Check if there is no space in disk
|
|
|
- * @param e that caused this checkDiskError call
|
|
|
- **/
|
|
|
- protected void checkDiskError(Exception e ) throws IOException {
|
|
|
-
|
|
|
- LOG.warn("checkDiskError: exception: ", e);
|
|
|
- if (isNetworkRelatedException(e)) {
|
|
|
- LOG.info("Not checking disk as checkDiskError was called on a network" +
|
|
|
- " related exception");
|
|
|
- return;
|
|
|
- }
|
|
|
- if (e.getMessage() != null &&
|
|
|
- e.getMessage().startsWith("No space left on device")) {
|
|
|
- throw new DiskOutOfSpaceException("No space left on device");
|
|
|
- } else {
|
|
|
- checkDiskError();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Check if the provided exception looks like it's from a network error
|
|
|
- * @param e the exception from a checkDiskError call
|
|
|
- * @return true if this exception is network related, false otherwise
|
|
|
- */
|
|
|
- protected boolean isNetworkRelatedException(Exception e) {
|
|
|
- if (e instanceof SocketException
|
|
|
- || e instanceof SocketTimeoutException
|
|
|
- || e instanceof ClosedChannelException
|
|
|
- || e instanceof ClosedByInterruptException) {
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- String msg = e.getMessage();
|
|
|
-
|
|
|
- return null != msg
|
|
|
- && (msg.startsWith("An established connection was aborted")
|
|
|
- || msg.startsWith("Broken pipe")
|
|
|
- || msg.startsWith("Connection reset")
|
|
|
- || msg.contains("java.nio.channels.SocketChannel"));
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Check if there is a disk failure and if so, handle the error
|
|
|
*/
|
|
|
public void checkDiskError() {
|
|
|
- try {
|
|
|
- data.checkDataDir();
|
|
|
- } catch (DiskErrorException de) {
|
|
|
- handleDiskError(de.getMessage());
|
|
|
+ synchronized(checkDiskErrorMutex) {
|
|
|
+ checkDiskErrorFlag = true;
|
|
|
+ if(checkDiskErrorThread == null) {
|
|
|
+ startCheckDiskErrorThread();
|
|
|
+ checkDiskErrorThread.start();
|
|
|
+ LOG.info("Starting CheckDiskError Thread");
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1676,13 +1646,8 @@ public class DataNode extends Configured
|
|
|
} catch (IOException ie) {
|
|
|
LOG.warn(bpReg + ":Failed to transfer " + b + " to " +
|
|
|
targets[0] + " got ", ie);
|
|
|
- // check if there are any disk problem
|
|
|
- try{
|
|
|
- checkDiskError(ie);
|
|
|
- } catch(IOException e) {
|
|
|
- LOG.warn("DataNode.checkDiskError failed in run() with: ", e);
|
|
|
- }
|
|
|
-
|
|
|
+ // check if there are any disk problem
|
|
|
+ checkDiskError();
|
|
|
} finally {
|
|
|
xmitsInProgress.getAndDecrement();
|
|
|
IOUtils.closeStream(blockSender);
|
|
@@ -2597,4 +2562,50 @@ public class DataNode extends Configured
|
|
|
public ShortCircuitRegistry getShortCircuitRegistry() {
|
|
|
return shortCircuitRegistry;
|
|
|
}
|
|
|
-}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Starts a new thread which will check for disk error check request
|
|
|
+ * every 5 sec
|
|
|
+ */
|
|
|
+ private void startCheckDiskErrorThread() {
|
|
|
+ checkDiskErrorThread = new Thread(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ while(shouldRun) {
|
|
|
+ boolean tempFlag ;
|
|
|
+ synchronized(checkDiskErrorMutex) {
|
|
|
+ tempFlag = checkDiskErrorFlag;
|
|
|
+ checkDiskErrorFlag = false;
|
|
|
+ }
|
|
|
+ if(tempFlag) {
|
|
|
+ try {
|
|
|
+ data.checkDataDir();
|
|
|
+ } catch (DiskErrorException de) {
|
|
|
+ handleDiskError(de.getMessage());
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Unexpected exception occurred while checking disk error " + e);
|
|
|
+ checkDiskErrorThread = null;
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ synchronized(checkDiskErrorMutex) {
|
|
|
+ lastDiskErrorCheck = System.currentTimeMillis();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ Thread.sleep(checkDiskErrorInterval);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.debug("InterruptedException in check disk error thread", e);
|
|
|
+ checkDiskErrorThread = null;
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getLastDiskErrorCheck() {
|
|
|
+ synchronized(checkDiskErrorMutex) {
|
|
|
+ return lastDiskErrorCheck;
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|