|
@@ -17,6 +17,7 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
|
|
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
import java.io.DataInputStream;
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
@@ -92,9 +93,6 @@ public class ContainerLocalizer {
|
|
|
public static final String FILECACHE = "filecache";
|
|
|
public static final String APPCACHE = "appcache";
|
|
|
public static final String USERCACHE = "usercache";
|
|
|
- public static final String OUTPUTDIR = "output";
|
|
|
- public static final String TOKEN_FILE_NAME_FMT = "%s.tokens";
|
|
|
- public static final String WORKDIR = "work";
|
|
|
private static final String APPCACHE_CTXT_FMT = "%s.app.cache.dirs";
|
|
|
private static final String USERCACHE_CTXT_FMT = "%s.user.cache.dirs";
|
|
|
private static final FsPermission FILECACHE_PERMS =
|
|
@@ -115,9 +113,10 @@ public class ContainerLocalizer {
|
|
|
|
|
|
private Set<Thread> localizingThreads =
|
|
|
Collections.synchronizedSet(new HashSet<Thread>());
|
|
|
+ private final String tokenFileName;
|
|
|
|
|
|
public ContainerLocalizer(FileContext lfs, String user, String appId,
|
|
|
- String localizerId, List<Path> localDirs,
|
|
|
+ String localizerId, String tokenFileName, List<Path> localDirs,
|
|
|
RecordFactory recordFactory) throws IOException {
|
|
|
if (null == user) {
|
|
|
throw new IOException("Cannot initialize for null user");
|
|
@@ -139,6 +138,8 @@ public class ContainerLocalizer {
|
|
|
" is loaded.");
|
|
|
this.appCacheDirContextName = String.format(APPCACHE_CTXT_FMT, appId);
|
|
|
this.pendingResources = new HashMap<LocalResource,Future<Path>>();
|
|
|
+ this.tokenFileName = Preconditions.checkNotNull(tokenFileName,
|
|
|
+ "token file name cannot be null");
|
|
|
}
|
|
|
|
|
|
@Private
|
|
@@ -159,8 +160,7 @@ public class ContainerLocalizer {
|
|
|
try {
|
|
|
// assume credentials in cwd
|
|
|
// TODO: Fix
|
|
|
- Path tokenPath =
|
|
|
- new Path(String.format(TOKEN_FILE_NAME_FMT, localizerId));
|
|
|
+ Path tokenPath = new Path(tokenFileName);
|
|
|
credFile = lfs.open(tokenPath);
|
|
|
creds.readTokenStorageStream(credFile);
|
|
|
// Explicitly deleting token file.
|
|
@@ -408,7 +408,9 @@ public class ContainerLocalizer {
|
|
|
*/
|
|
|
public static void buildMainArgs(List<String> command,
|
|
|
String user, String appId, String locId,
|
|
|
- InetSocketAddress nmAddr, List<String> localDirs, Configuration conf) {
|
|
|
+ InetSocketAddress nmAddr,
|
|
|
+ String tokenFileName,
|
|
|
+ List<String> localDirs, Configuration conf) {
|
|
|
|
|
|
String logLevel = conf.get(YarnConfiguration.
|
|
|
NM_CONTAINER_LOCALIZER_LOG_LEVEL,
|
|
@@ -420,6 +422,7 @@ public class ContainerLocalizer {
|
|
|
command.add(locId);
|
|
|
command.add(nmAddr.getHostName());
|
|
|
command.add(Integer.toString(nmAddr.getPort()));
|
|
|
+ command.add(tokenFileName);
|
|
|
for(String dir : localDirs) {
|
|
|
command.add(dir);
|
|
|
}
|
|
@@ -450,8 +453,9 @@ public class ContainerLocalizer {
|
|
|
String locId = argv[2];
|
|
|
InetSocketAddress nmAddr =
|
|
|
new InetSocketAddress(argv[3], Integer.parseInt(argv[4]));
|
|
|
- String[] sLocaldirs = Arrays.copyOfRange(argv, 5, argv.length);
|
|
|
- ArrayList<Path> localDirs = new ArrayList<Path>(sLocaldirs.length);
|
|
|
+ String tokenFileName = argv[5];
|
|
|
+ String[] sLocaldirs = Arrays.copyOfRange(argv, 6, argv.length);
|
|
|
+ ArrayList<Path> localDirs = new ArrayList<>(sLocaldirs.length);
|
|
|
for (String sLocaldir : sLocaldirs) {
|
|
|
localDirs.add(new Path(sLocaldir));
|
|
|
}
|
|
@@ -463,12 +467,11 @@ public class ContainerLocalizer {
|
|
|
LOG.warn("Localization running as " + uid + " not " + user);
|
|
|
}
|
|
|
|
|
|
- ContainerLocalizer localizer =
|
|
|
- new ContainerLocalizer(FileContext.getLocalFSFileContext(), user,
|
|
|
- appId, locId, localDirs,
|
|
|
+ ContainerLocalizer localizer = new ContainerLocalizer(
|
|
|
+ FileContext.getLocalFSFileContext(), user,
|
|
|
+ appId, locId, tokenFileName, localDirs,
|
|
|
RecordFactoryProvider.getRecordFactory(null));
|
|
|
localizer.runLocalization(nmAddr);
|
|
|
- return;
|
|
|
} catch (Throwable e) {
|
|
|
// Print traces to stdout so that they can be logged by the NM address
|
|
|
// space in both DefaultCE and LCE cases
|