|
@@ -26,9 +26,12 @@ import java.util.TimerTask;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileContext;
|
|
|
import org.apache.hadoop.fs.LocalDirAllocator;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
+import org.apache.hadoop.yarn.YarnException;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.service.AbstractService;
|
|
|
|
|
@@ -120,6 +123,19 @@ public class LocalDirsHandlerService extends AbstractService {
|
|
|
lastDisksCheckTime = System.currentTimeMillis();
|
|
|
super.init(conf);
|
|
|
|
|
|
+ FileContext localFs;
|
|
|
+ try {
|
|
|
+ localFs = FileContext.getLocalFSFileContext(config);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new YarnException("Unable to get the local filesystem", e);
|
|
|
+ }
|
|
|
+ FsPermission perm = new FsPermission((short)0755);
|
|
|
+ boolean createSucceeded = localDirs.createNonExistentDirs(localFs, perm);
|
|
|
+ createSucceeded &= logDirs.createNonExistentDirs(localFs, perm);
|
|
|
+ if (!createSucceeded) {
|
|
|
+ updateDirsAfterFailure();
|
|
|
+ }
|
|
|
+
|
|
|
// Check the disk health immediately to weed out bad directories
|
|
|
// before other init code attempts to use them.
|
|
|
checkDirs();
|
|
@@ -229,7 +245,8 @@ public class LocalDirsHandlerService extends AbstractService {
|
|
|
* Set good local dirs and good log dirs in the configuration so that the
|
|
|
* LocalDirAllocator objects will use this updated configuration only.
|
|
|
*/
|
|
|
- private void updateDirsInConfiguration() {
|
|
|
+ private void updateDirsAfterFailure() {
|
|
|
+ LOG.info("Disk(s) failed. " + getDisksHealthReport());
|
|
|
Configuration conf = getConfig();
|
|
|
List<String> localDirs = getLocalDirs();
|
|
|
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS,
|
|
@@ -237,6 +254,10 @@ public class LocalDirsHandlerService extends AbstractService {
|
|
|
List<String> logDirs = getLogDirs();
|
|
|
conf.setStrings(YarnConfiguration.NM_LOG_DIRS,
|
|
|
logDirs.toArray(new String[logDirs.size()]));
|
|
|
+ if (!areDisksHealthy()) {
|
|
|
+ // Just log.
|
|
|
+ LOG.error("Most of the disks failed. " + getDisksHealthReport());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void checkDirs() {
|
|
@@ -249,12 +270,7 @@ public class LocalDirsHandlerService extends AbstractService {
|
|
|
}
|
|
|
|
|
|
if (newFailure) {
|
|
|
- LOG.info("Disk(s) failed. " + getDisksHealthReport());
|
|
|
- updateDirsInConfiguration();
|
|
|
- if (!areDisksHealthy()) {
|
|
|
- // Just log.
|
|
|
- LOG.error("Most of the disks failed. " + getDisksHealthReport());
|
|
|
- }
|
|
|
+ updateDirsAfterFailure();
|
|
|
}
|
|
|
lastDisksCheckTime = System.currentTimeMillis();
|
|
|
}
|