|
@@ -18,7 +18,6 @@
|
|
|
|
|
|
package org.apache.hadoop.mapreduce.v2.app;
|
|
package org.apache.hadoop.mapreduce.v2.app;
|
|
|
|
|
|
-import java.io.File;
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.lang.reflect.Constructor;
|
|
import java.lang.reflect.Constructor;
|
|
import java.lang.reflect.InvocationTargetException;
|
|
import java.lang.reflect.InvocationTargetException;
|
|
@@ -37,7 +36,6 @@ import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
-import org.apache.hadoop.fs.FileContext;
|
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.mapred.FileOutputCommitter;
|
|
import org.apache.hadoop.mapred.FileOutputCommitter;
|
|
@@ -107,6 +105,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
import org.apache.hadoop.security.Credentials;
|
|
import org.apache.hadoop.security.Credentials;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
import org.apache.hadoop.util.ShutdownHookManager;
|
|
import org.apache.hadoop.util.ShutdownHookManager;
|
|
import org.apache.hadoop.util.StringInterner;
|
|
import org.apache.hadoop.util.StringInterner;
|
|
@@ -125,6 +124,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
import org.apache.hadoop.yarn.event.Event;
|
|
import org.apache.hadoop.yarn.event.Event;
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
|
+import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
|
|
import org.apache.hadoop.yarn.service.AbstractService;
|
|
import org.apache.hadoop.yarn.service.AbstractService;
|
|
import org.apache.hadoop.yarn.service.CompositeService;
|
|
import org.apache.hadoop.yarn.service.CompositeService;
|
|
import org.apache.hadoop.yarn.service.Service;
|
|
import org.apache.hadoop.yarn.service.Service;
|
|
@@ -192,7 +192,7 @@ public class MRAppMaster extends CompositeService {
|
|
private SpeculatorEventDispatcher speculatorEventDispatcher;
|
|
private SpeculatorEventDispatcher speculatorEventDispatcher;
|
|
|
|
|
|
private Job job;
|
|
private Job job;
|
|
- private Credentials fsTokens = new Credentials(); // Filled during init
|
|
|
|
|
|
+ private Credentials jobCredentials = new Credentials(); // Filled during init
|
|
protected UserGroupInformation currentUser; // Will be setup during init
|
|
protected UserGroupInformation currentUser; // Will be setup during init
|
|
|
|
|
|
private volatile boolean isLastAMRetry = false;
|
|
private volatile boolean isLastAMRetry = false;
|
|
@@ -231,7 +231,7 @@ public class MRAppMaster extends CompositeService {
|
|
protected void serviceInit(final Configuration conf) throws Exception {
|
|
protected void serviceInit(final Configuration conf) throws Exception {
|
|
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
|
|
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
|
|
|
|
|
|
- downloadTokensAndSetupUGI(conf);
|
|
|
|
|
|
+ initJobCredentialsAndUGI(conf);
|
|
|
|
|
|
isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
|
|
isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
|
|
LOG.info("The specific max attempts: " + maxAppAttempts +
|
|
LOG.info("The specific max attempts: " + maxAppAttempts +
|
|
@@ -470,7 +470,7 @@ public class MRAppMaster extends CompositeService {
|
|
}
|
|
}
|
|
|
|
|
|
protected Credentials getCredentials() {
|
|
protected Credentials getCredentials() {
|
|
- return fsTokens;
|
|
|
|
|
|
+ return jobCredentials;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -590,7 +590,7 @@ public class MRAppMaster extends CompositeService {
|
|
// create single job
|
|
// create single job
|
|
Job newJob =
|
|
Job newJob =
|
|
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
|
|
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
|
|
- taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
|
|
|
|
|
|
+ taskAttemptListener, jobTokenSecretManager, jobCredentials, clock,
|
|
completedTasksFromPreviousRun, metrics,
|
|
completedTasksFromPreviousRun, metrics,
|
|
committer, newApiCommitter,
|
|
committer, newApiCommitter,
|
|
currentUser.getUserName(), appSubmitTime, amInfos, context,
|
|
currentUser.getUserName(), appSubmitTime, amInfos, context,
|
|
@@ -607,22 +607,11 @@ public class MRAppMaster extends CompositeService {
|
|
* Obtain the tokens needed by the job and put them in the UGI
|
|
* Obtain the tokens needed by the job and put them in the UGI
|
|
* @param conf
|
|
* @param conf
|
|
*/
|
|
*/
|
|
- protected void downloadTokensAndSetupUGI(Configuration conf) {
|
|
|
|
|
|
+ protected void initJobCredentialsAndUGI(Configuration conf) {
|
|
|
|
|
|
try {
|
|
try {
|
|
this.currentUser = UserGroupInformation.getCurrentUser();
|
|
this.currentUser = UserGroupInformation.getCurrentUser();
|
|
-
|
|
|
|
- // Read the file-system tokens from the localized tokens-file.
|
|
|
|
- Path jobSubmitDir =
|
|
|
|
- FileContext.getLocalFSFileContext().makeQualified(
|
|
|
|
- new Path(new File(MRJobConfig.JOB_SUBMIT_DIR)
|
|
|
|
- .getAbsolutePath()));
|
|
|
|
- Path jobTokenFile =
|
|
|
|
- new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
|
|
|
|
- fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
|
|
|
|
- LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
|
|
|
|
- + jobTokenFile);
|
|
|
|
- currentUser.addCredentials(fsTokens); // For use by AppMaster itself.
|
|
|
|
|
|
+ this.jobCredentials = ((JobConf)conf).getCredentials();
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
throw new YarnRuntimeException(e);
|
|
throw new YarnRuntimeException(e);
|
|
}
|
|
}
|
|
@@ -1034,7 +1023,7 @@ public class MRAppMaster extends CompositeService {
|
|
// are reducers as the shuffle secret would be app attempt specific.
|
|
// are reducers as the shuffle secret would be app attempt specific.
|
|
int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
|
|
int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
|
|
boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
|
|
boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
|
|
- TokenCache.getShuffleSecretKey(fsTokens) != null);
|
|
|
|
|
|
+ TokenCache.getShuffleSecretKey(jobCredentials) != null);
|
|
|
|
|
|
if (recoveryEnabled && recoverySupportedByCommitter
|
|
if (recoveryEnabled && recoverySupportedByCommitter
|
|
&& shuffleKeyValidForRecovery) {
|
|
&& shuffleKeyValidForRecovery) {
|
|
@@ -1365,9 +1354,23 @@ public class MRAppMaster extends CompositeService {
|
|
// them
|
|
// them
|
|
Credentials credentials =
|
|
Credentials credentials =
|
|
UserGroupInformation.getCurrentUser().getCredentials();
|
|
UserGroupInformation.getCurrentUser().getCredentials();
|
|
|
|
+ LOG.info("Executing with tokens:");
|
|
|
|
+ for (Token<?> token : credentials.getAllTokens()) {
|
|
|
|
+ LOG.info(token);
|
|
|
|
+ }
|
|
|
|
+
|
|
UserGroupInformation appMasterUgi = UserGroupInformation
|
|
UserGroupInformation appMasterUgi = UserGroupInformation
|
|
.createRemoteUser(jobUserName);
|
|
.createRemoteUser(jobUserName);
|
|
appMasterUgi.addCredentials(credentials);
|
|
appMasterUgi.addCredentials(credentials);
|
|
|
|
+
|
|
|
|
+ // Now remove the AM->RM token so tasks don't have it
|
|
|
|
+ Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
|
|
|
|
+ while (iter.hasNext()) {
|
|
|
|
+ Token<?> token = iter.next();
|
|
|
|
+ if (token.getKind().equals(ApplicationTokenIdentifier.KIND_NAME)) {
|
|
|
|
+ iter.remove();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
conf.getCredentials().addAll(credentials);
|
|
conf.getCredentials().addAll(credentials);
|
|
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
@Override
|
|
@Override
|