|
@@ -131,7 +131,9 @@ public class MRAppMaster extends CompositeService {
|
|
|
private JobEventDispatcher jobEventDispatcher;
|
|
|
|
|
|
private Job job;
|
|
|
-
|
|
|
+ private Credentials fsTokens = new Credentials(); // Filled during init
|
|
|
+ private UserGroupInformation currentUser; // Will be setup during init
|
|
|
+
|
|
|
public MRAppMaster(ApplicationAttemptId applicationAttemptId) {
|
|
|
this(applicationAttemptId, new SystemClock());
|
|
|
}
|
|
@@ -146,6 +148,9 @@ public class MRAppMaster extends CompositeService {
|
|
|
|
|
|
@Override
|
|
|
public void init(final Configuration conf) {
|
|
|
+
|
|
|
+ downloadTokensAndSetupUGI(conf);
|
|
|
+
|
|
|
context = new RunningAppContext(conf);
|
|
|
|
|
|
// Job name is the same as the app name util we support DAG of jobs
|
|
@@ -226,37 +231,6 @@ public class MRAppMaster extends CompositeService {
|
|
|
/** Create and initialize (but don't start) a single job. */
|
|
|
protected Job createJob(Configuration conf) {
|
|
|
|
|
|
- // ////////// Obtain the tokens needed by the job. //////////
|
|
|
- Credentials fsTokens = new Credentials();
|
|
|
- UserGroupInformation currentUser = null;
|
|
|
-
|
|
|
- try {
|
|
|
- currentUser = UserGroupInformation.getCurrentUser();
|
|
|
-
|
|
|
- if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
- // Read the file-system tokens from the localized tokens-file.
|
|
|
- Path jobSubmitDir =
|
|
|
- FileContext.getLocalFSFileContext().makeQualified(
|
|
|
- new Path(new File(MRJobConfig.JOB_SUBMIT_DIR)
|
|
|
- .getAbsolutePath()));
|
|
|
- Path jobTokenFile =
|
|
|
- new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
|
|
|
- fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
|
|
|
- LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
|
|
|
- + jobTokenFile);
|
|
|
-
|
|
|
- for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) {
|
|
|
- LOG.info(" --- DEBUG: Token of kind " + tk.getKind()
|
|
|
- + "in current ugi in the AppMaster for service "
|
|
|
- + tk.getService());
|
|
|
- currentUser.addToken(tk); // For use by AppMaster itself.
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (IOException e) {
|
|
|
- throw new YarnException(e);
|
|
|
- }
|
|
|
- // ////////// End of obtaining the tokens needed by the job. //////////
|
|
|
-
|
|
|
// create single job
|
|
|
Job newJob = new JobImpl(appAttemptID, conf, dispatcher.getEventHandler(),
|
|
|
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
|
|
@@ -297,6 +271,42 @@ public class MRAppMaster extends CompositeService {
|
|
|
return newJob;
|
|
|
} // end createJob()
|
|
|
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Obtain the tokens needed by the job and put them in the UGI
|
|
|
+ * @param conf
|
|
|
+ */
|
|
|
+ protected void downloadTokensAndSetupUGI(Configuration conf) {
|
|
|
+
|
|
|
+ try {
|
|
|
+ this.currentUser = UserGroupInformation.getCurrentUser();
|
|
|
+
|
|
|
+ if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ // Read the file-system tokens from the localized tokens-file.
|
|
|
+ Path jobSubmitDir =
|
|
|
+ FileContext.getLocalFSFileContext().makeQualified(
|
|
|
+ new Path(new File(MRJobConfig.JOB_SUBMIT_DIR)
|
|
|
+ .getAbsolutePath()));
|
|
|
+ Path jobTokenFile =
|
|
|
+ new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
|
|
|
+ fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
|
|
|
+ LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
|
|
|
+ + jobTokenFile);
|
|
|
+
|
|
|
+ for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Token of kind " + tk.getKind()
|
|
|
+ + "in current ugi in the AppMaster for service "
|
|
|
+ + tk.getService());
|
|
|
+ }
|
|
|
+ currentUser.addToken(tk); // For use by AppMaster itself.
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new YarnException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
protected void addIfService(Object object) {
|
|
|
if (object instanceof Service) {
|
|
|
addService((Service) object);
|