|
@@ -57,7 +57,6 @@ import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
@@ -68,7 +67,6 @@ import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
import org.apache.hadoop.fs.FileContext;
|
|
|
-import org.apache.hadoop.fs.LocalDirAllocator;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
@@ -81,6 +79,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
|
|
@@ -125,19 +124,18 @@ public class ResourceLocalizationService extends CompositeService
|
|
|
private InetSocketAddress localizationServerAddress;
|
|
|
private long cacheTargetSize;
|
|
|
private long cacheCleanupPeriod;
|
|
|
- private List<Path> logDirs;
|
|
|
- private List<Path> localDirs;
|
|
|
- private List<Path> sysDirs;
|
|
|
+
|
|
|
private final ContainerExecutor exec;
|
|
|
protected final Dispatcher dispatcher;
|
|
|
private final DeletionService delService;
|
|
|
private LocalizerTracker localizerTracker;
|
|
|
private RecordFactory recordFactory;
|
|
|
- private final LocalDirAllocator localDirsSelector;
|
|
|
private final ScheduledExecutorService cacheCleanup;
|
|
|
|
|
|
private final LocalResourcesTracker publicRsrc;
|
|
|
-
|
|
|
+
|
|
|
+ private LocalDirsHandlerService dirsHandler;
|
|
|
+
|
|
|
/**
|
|
|
* Map of LocalResourceTrackers keyed by username, for private
|
|
|
* resources.
|
|
@@ -153,12 +151,15 @@ public class ResourceLocalizationService extends CompositeService
|
|
|
new ConcurrentHashMap<String,LocalResourcesTracker>();
|
|
|
|
|
|
public ResourceLocalizationService(Dispatcher dispatcher,
|
|
|
- ContainerExecutor exec, DeletionService delService) {
|
|
|
+ ContainerExecutor exec, DeletionService delService,
|
|
|
+ LocalDirsHandlerService dirsHandler) {
|
|
|
+
|
|
|
super(ResourceLocalizationService.class.getName());
|
|
|
this.exec = exec;
|
|
|
this.dispatcher = dispatcher;
|
|
|
this.delService = delService;
|
|
|
- this.localDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
|
|
|
+ this.dirsHandler = dirsHandler;
|
|
|
+
|
|
|
this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
|
|
|
this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
|
|
|
new ThreadFactoryBuilder()
|
|
@@ -177,41 +178,31 @@ public class ResourceLocalizationService extends CompositeService
|
|
|
@Override
|
|
|
public void init(Configuration conf) {
|
|
|
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
|
|
|
+
|
|
|
try {
|
|
|
// TODO queue deletions here, rather than NM init?
|
|
|
FileContext lfs = getLocalFileContext(conf);
|
|
|
- String[] sLocalDirs =
|
|
|
- conf.getStrings(YarnConfiguration.NM_LOCAL_DIRS, YarnConfiguration.DEFAULT_NM_LOCAL_DIRS);
|
|
|
-
|
|
|
- localDirs = new ArrayList<Path>(sLocalDirs.length);
|
|
|
- logDirs = new ArrayList<Path>(sLocalDirs.length);
|
|
|
- sysDirs = new ArrayList<Path>(sLocalDirs.length);
|
|
|
- for (String sLocaldir : sLocalDirs) {
|
|
|
- Path localdir = new Path(sLocaldir);
|
|
|
- localDirs.add(localdir);
|
|
|
+ List<String> localDirs = dirsHandler.getLocalDirs();
|
|
|
+ for (String localDir : localDirs) {
|
|
|
// $local/usercache
|
|
|
- Path userdir = new Path(localdir, ContainerLocalizer.USERCACHE);
|
|
|
- lfs.mkdir(userdir, null, true);
|
|
|
+ Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE);
|
|
|
+ lfs.mkdir(userDir, null, true);
|
|
|
// $local/filecache
|
|
|
- Path filedir = new Path(localdir, ContainerLocalizer.FILECACHE);
|
|
|
- lfs.mkdir(filedir, null, true);
|
|
|
+ Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE);
|
|
|
+ lfs.mkdir(fileDir, null, true);
|
|
|
// $local/nmPrivate
|
|
|
- Path sysdir = new Path(localdir, NM_PRIVATE_DIR);
|
|
|
- lfs.mkdir(sysdir, NM_PRIVATE_PERM, true);
|
|
|
- sysDirs.add(sysdir);
|
|
|
+ Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
|
|
|
+ lfs.mkdir(sysDir, NM_PRIVATE_PERM, true);
|
|
|
}
|
|
|
- String[] sLogdirs = conf.getStrings(YarnConfiguration.NM_LOG_DIRS, YarnConfiguration.DEFAULT_NM_LOG_DIRS);
|
|
|
- for (String sLogdir : sLogdirs) {
|
|
|
- Path logdir = new Path(sLogdir);
|
|
|
- logDirs.add(logdir);
|
|
|
- lfs.mkdir(logdir, null, true);
|
|
|
+
|
|
|
+ List<String> logDirs = dirsHandler.getLogDirs();
|
|
|
+ for (String logDir : logDirs) {
|
|
|
+ lfs.mkdir(new Path(logDir), null, true);
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
throw new YarnException("Failed to initialize LocalizationService", e);
|
|
|
}
|
|
|
- localDirs = Collections.unmodifiableList(localDirs);
|
|
|
- logDirs = Collections.unmodifiableList(logDirs);
|
|
|
- sysDirs = Collections.unmodifiableList(sysDirs);
|
|
|
+
|
|
|
cacheTargetSize =
|
|
|
conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_TARGET_SIZE_MB, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_TARGET_SIZE_MB) << 20;
|
|
|
cacheCleanupPeriod =
|
|
@@ -391,7 +382,7 @@ public class ResourceLocalizationService extends CompositeService
|
|
|
String containerIDStr = c.toString();
|
|
|
String appIDStr = ConverterUtils.toString(
|
|
|
c.getContainerID().getApplicationAttemptId().getApplicationId());
|
|
|
- for (Path localDir : localDirs) {
|
|
|
+ for (String localDir : dirsHandler.getLocalDirs()) {
|
|
|
|
|
|
// Delete the user-owned container-dir
|
|
|
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
|
|
@@ -428,7 +419,7 @@ public class ResourceLocalizationService extends CompositeService
|
|
|
// Delete the application directories
|
|
|
userName = application.getUser();
|
|
|
appIDStr = application.toString();
|
|
|
- for (Path localDir : localDirs) {
|
|
|
+ for (String localDir : dirsHandler.getLocalDirs()) {
|
|
|
|
|
|
// Delete the user-owned app-dir
|
|
|
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
|
|
@@ -574,12 +565,9 @@ public class ResourceLocalizationService extends CompositeService
|
|
|
|
|
|
class PublicLocalizer extends Thread {
|
|
|
|
|
|
- static final String PUBCACHE_CTXT = "public.cache.dirs";
|
|
|
-
|
|
|
final FileContext lfs;
|
|
|
final Configuration conf;
|
|
|
final ExecutorService threadPool;
|
|
|
- final LocalDirAllocator publicDirs;
|
|
|
final CompletionService<Path> queue;
|
|
|
final Map<Future<Path>,LocalizerResourceRequestEvent> pending;
|
|
|
// TODO hack to work around broken signaling
|
|
@@ -601,13 +589,23 @@ public class ResourceLocalizationService extends CompositeService
|
|
|
this.conf = conf;
|
|
|
this.pending = pending;
|
|
|
this.attempts = attempts;
|
|
|
- String[] publicFilecache = new String[localDirs.size()];
|
|
|
- for (int i = 0, n = localDirs.size(); i < n; ++i) {
|
|
|
- publicFilecache[i] =
|
|
|
- new Path(localDirs.get(i), ContainerLocalizer.FILECACHE).toString();
|
|
|
- }
|
|
|
- conf.setStrings(PUBCACHE_CTXT, publicFilecache);
|
|
|
- this.publicDirs = new LocalDirAllocator(PUBCACHE_CTXT);
|
|
|
+// List<String> localDirs = dirsHandler.getLocalDirs();
|
|
|
+// String[] publicFilecache = new String[localDirs.size()];
|
|
|
+// for (int i = 0, n = localDirs.size(); i < n; ++i) {
|
|
|
+// publicFilecache[i] =
|
|
|
+// new Path(localDirs.get(i), ContainerLocalizer.FILECACHE).toString();
|
|
|
+// }
|
|
|
+// conf.setStrings(PUBCACHE_CTXT, publicFilecache);
|
|
|
+
|
|
|
+// this.publicDirDestPath = new LocalDirAllocator(PUBCACHE_CTXT).getLocalPathForWrite(pathStr, conf);
|
|
|
+// List<String> localDirs = dirsHandler.getLocalDirs();
|
|
|
+// String[] publicFilecache = new String[localDirs.size()];
|
|
|
+// int i = 0;
|
|
|
+// for (String localDir : localDirs) {
|
|
|
+// publicFilecache[i++] =
|
|
|
+// new Path(localDir, ContainerLocalizer.FILECACHE).toString();
|
|
|
+// }
|
|
|
+
|
|
|
this.threadPool = threadPool;
|
|
|
this.queue = new ExecutorCompletionService<Path>(threadPool);
|
|
|
}
|
|
@@ -619,11 +617,19 @@ public class ResourceLocalizationService extends CompositeService
|
|
|
synchronized (attempts) {
|
|
|
List<LocalizerResourceRequestEvent> sigh = attempts.get(key);
|
|
|
if (null == sigh) {
|
|
|
- pending.put(queue.submit(new FSDownload(
|
|
|
- lfs, null, conf, publicDirs,
|
|
|
- request.getResource().getRequest(), new Random())),
|
|
|
- request);
|
|
|
- attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
|
|
|
+ LocalResource resource = request.getResource().getRequest();
|
|
|
+ try {
|
|
|
+ Path publicDirDestPath = dirsHandler.getLocalPathForWrite(
|
|
|
+ "." + Path.SEPARATOR + ContainerLocalizer.FILECACHE,
|
|
|
+ ContainerLocalizer.getEstimatedSize(resource), true);
|
|
|
+ pending.put(queue.submit(new FSDownload(
|
|
|
+ lfs, null, conf, publicDirDestPath, resource, new Random())),
|
|
|
+ request);
|
|
|
+ attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Local path for public localization is not found. "
|
|
|
+ + " May be disks failed.", e);
|
|
|
+ }
|
|
|
} else {
|
|
|
sigh.add(request);
|
|
|
}
|
|
@@ -844,24 +850,30 @@ public class ResourceLocalizationService extends CompositeService
|
|
|
public void run() {
|
|
|
Path nmPrivateCTokensPath = null;
|
|
|
try {
|
|
|
- // Use LocalDirAllocator to get nmPrivateDir
|
|
|
+ // Get nmPrivateDir
|
|
|
nmPrivateCTokensPath =
|
|
|
- localDirsSelector.getLocalPathForWrite(
|
|
|
- NM_PRIVATE_DIR
|
|
|
- + Path.SEPARATOR
|
|
|
+ dirsHandler.getLocalPathForWrite(
|
|
|
+ NM_PRIVATE_DIR + Path.SEPARATOR
|
|
|
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
|
|
|
- localizerId), getConfig());
|
|
|
+ localizerId));
|
|
|
|
|
|
// 0) init queue, etc.
|
|
|
// 1) write credentials to private dir
|
|
|
writeCredentials(nmPrivateCTokensPath);
|
|
|
// 2) exec initApplication and wait
|
|
|
- exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
|
|
|
- context.getUser(),
|
|
|
- ConverterUtils.toString(
|
|
|
- context.getContainerId().
|
|
|
- getApplicationAttemptId().getApplicationId()),
|
|
|
- localizerId, localDirs);
|
|
|
+ List<String> localDirs = dirsHandler.getLocalDirs();
|
|
|
+ List<String> logDirs = dirsHandler.getLogDirs();
|
|
|
+ if (dirsHandler.areDisksHealthy()) {
|
|
|
+ exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
|
|
|
+ context.getUser(),
|
|
|
+ ConverterUtils.toString(
|
|
|
+ context.getContainerId().
|
|
|
+ getApplicationAttemptId().getApplicationId()),
|
|
|
+ localizerId, localDirs, logDirs);
|
|
|
+ } else {
|
|
|
+ throw new IOException("All disks failed. "
|
|
|
+ + dirsHandler.getDisksHealthReport());
|
|
|
+ }
|
|
|
// TODO handle ExitCodeException separately?
|
|
|
} catch (Exception e) {
|
|
|
LOG.info("Localizer failed", e);
|