|
@@ -18,6 +18,8 @@
|
|
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
|
|
|
|
|
import static org.apache.hadoop.util.Shell.getAllShells;
|
|
|
+
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -93,7 +95,6 @@ public class ContainerLocalizer {
|
|
|
public static final String FILECACHE = "filecache";
|
|
|
public static final String APPCACHE = "appcache";
|
|
|
public static final String USERCACHE = "usercache";
|
|
|
- public static final String TOKEN_FILE_NAME_FMT = "%s.tokens";
|
|
|
private static final String APPCACHE_CTXT_FMT = "%s.app.cache.dirs";
|
|
|
private static final String USERCACHE_CTXT_FMT = "%s.user.cache.dirs";
|
|
|
private static final FsPermission FILECACHE_PERMS =
|
|
@@ -114,9 +115,10 @@ public class ContainerLocalizer {
|
|
|
|
|
|
private Set<Thread> localizingThreads =
|
|
|
Collections.synchronizedSet(new HashSet<>());
|
|
|
+ private final String tokenFileName;
|
|
|
|
|
|
public ContainerLocalizer(FileContext lfs, String user, String appId,
|
|
|
- String localizerId, List<Path> localDirs,
|
|
|
+ String localizerId, String tokenFileName, List<Path> localDirs,
|
|
|
RecordFactory recordFactory) throws IOException {
|
|
|
if (null == user) {
|
|
|
throw new IOException("Cannot initialize for null user");
|
|
@@ -135,6 +137,8 @@ public class ContainerLocalizer {
|
|
|
YarnConfiguration.DEFAULT_DISK_VALIDATOR);
|
|
|
this.appCacheDirContextName = String.format(APPCACHE_CTXT_FMT, appId);
|
|
|
this.pendingResources = new HashMap<LocalResource,Future<Path>>();
|
|
|
+ this.tokenFileName = Preconditions.checkNotNull(tokenFileName,
|
|
|
+ "token file name cannot be null");
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@@ -161,8 +165,7 @@ public class ContainerLocalizer {
|
|
|
try {
|
|
|
// assume credentials in cwd
|
|
|
// TODO: Fix
|
|
|
- Path tokenPath =
|
|
|
- new Path(String.format(TOKEN_FILE_NAME_FMT, localizerId));
|
|
|
+ Path tokenPath = new Path(tokenFileName);
|
|
|
credFile = lfs.open(tokenPath);
|
|
|
creds.readTokenStorageStream(credFile);
|
|
|
// Explicitly deleting token file.
|
|
@@ -411,7 +414,9 @@ public class ContainerLocalizer {
|
|
|
*/
|
|
|
public static void buildMainArgs(List<String> command,
|
|
|
String user, String appId, String locId,
|
|
|
- InetSocketAddress nmAddr, List<String> localDirs, Configuration conf) {
|
|
|
+ InetSocketAddress nmAddr,
|
|
|
+ String tokenFileName,
|
|
|
+ List<String> localDirs, Configuration conf) {
|
|
|
|
|
|
String logLevel = conf.get(YarnConfiguration.
|
|
|
NM_CONTAINER_LOCALIZER_LOG_LEVEL,
|
|
@@ -423,6 +428,7 @@ public class ContainerLocalizer {
|
|
|
command.add(locId);
|
|
|
command.add(nmAddr.getHostName());
|
|
|
command.add(Integer.toString(nmAddr.getPort()));
|
|
|
+ command.add(tokenFileName);
|
|
|
for(String dir : localDirs) {
|
|
|
command.add(dir);
|
|
|
}
|
|
@@ -453,8 +459,9 @@ public class ContainerLocalizer {
|
|
|
String locId = argv[2];
|
|
|
InetSocketAddress nmAddr =
|
|
|
new InetSocketAddress(argv[3], Integer.parseInt(argv[4]));
|
|
|
- String[] sLocaldirs = Arrays.copyOfRange(argv, 5, argv.length);
|
|
|
- ArrayList<Path> localDirs = new ArrayList<Path>(sLocaldirs.length);
|
|
|
+ String tokenFileName = argv[5];
|
|
|
+ String[] sLocaldirs = Arrays.copyOfRange(argv, 6, argv.length);
|
|
|
+ ArrayList<Path> localDirs = new ArrayList<>(sLocaldirs.length);
|
|
|
for (String sLocaldir : sLocaldirs) {
|
|
|
localDirs.add(new Path(sLocaldir));
|
|
|
}
|
|
@@ -466,12 +473,11 @@ public class ContainerLocalizer {
|
|
|
LOG.warn("Localization running as " + uid + " not " + user);
|
|
|
}
|
|
|
|
|
|
- ContainerLocalizer localizer =
|
|
|
- new ContainerLocalizer(FileContext.getLocalFSFileContext(), user,
|
|
|
- appId, locId, localDirs,
|
|
|
+ ContainerLocalizer localizer = new ContainerLocalizer(
|
|
|
+ FileContext.getLocalFSFileContext(), user,
|
|
|
+ appId, locId, tokenFileName, localDirs,
|
|
|
RecordFactoryProvider.getRecordFactory(null));
|
|
|
localizer.runLocalization(nmAddr);
|
|
|
- return;
|
|
|
} catch (Throwable e) {
|
|
|
// Print traces to stdout so that they can be logged by the NM address
|
|
|
// space in both DefaultCE and LCE cases
|