浏览代码

commit 2839699a7d8d8bb9141cd00291414ff02ddc5168
Author: Arun C Murthy <acmurthy@apache.org>
Date: Sat Feb 27 03:26:42 2010 -0800

MAPREDUCE-1528 from https://issues.apache.org/jira/secure/attachment/12437339/MAPREDUCE-1528_yhadoop20.patch


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077251 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 年之前
父节点
当前提交
842d7dc58f
共有 28 个文件被更改,包括 173 次插入181 次删除
  1. 2 2
      src/core/org/apache/hadoop/security/Credentials.java
  2. 1 1
      src/core/org/apache/hadoop/security/UserGroupInformation.java
  3. 2 1
      src/core/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java
  4. 2 2
      src/hdfs/org/apache/hadoop/hdfs/server/namenode/DelegationTokenServlet.java
  5. 3 3
      src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
  6. 6 2
      src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
  7. 11 5
      src/mapred/org/apache/hadoop/mapred/Child.java
  8. 1 1
      src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
  9. 2 1
      src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
  10. 13 8
      src/mapred/org/apache/hadoop/mapred/JobClient.java
  11. 21 0
      src/mapred/org/apache/hadoop/mapred/JobConf.java
  12. 4 4
      src/mapred/org/apache/hadoop/mapred/JobInProgress.java
  13. 2 2
      src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
  14. 3 6
      src/mapred/org/apache/hadoop/mapred/JobTracker.java
  15. 6 4
      src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
  16. 1 0
      src/mapred/org/apache/hadoop/mapred/Reporter.java
  17. 2 1
      src/mapred/org/apache/hadoop/mapred/Task.java
  18. 2 2
      src/mapred/org/apache/hadoop/mapred/TaskTracker.java
  19. 11 0
      src/mapred/org/apache/hadoop/mapreduce/JobContext.java
  20. 2 1
      src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
  21. 3 1
      src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
  22. 20 85
      src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
  23. 2 2
      src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
  24. 34 33
      src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
  25. 3 3
      src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
  26. 4 4
      src/test/org/apache/hadoop/security/TestJobCredentials.java
  27. 2 2
      src/test/org/apache/hadoop/tools/TestDelegationTokenFetcher.java
  28. 8 5
      src/tools/org/apache/hadoop/tools/DistCp.java

+ 2 - 2
src/core/org/apache/hadoop/security/TokenStorage.java → src/core/org/apache/hadoop/security/Credentials.java

@@ -39,7 +39,7 @@ import org.apache.hadoop.conf.Configuration;
  * A class that provides the facilities of reading and writing 
  * secret keys and Tokens.
  */
-public class TokenStorage implements Writable {
+public class Credentials implements Writable {
 
   private  Map<Text, byte[]> secretKeysMap = new HashMap<Text, byte[]>();
   private  Map<Text, Token<? extends TokenIdentifier>> tokenMap = 
@@ -115,7 +115,7 @@ public class TokenStorage implements Writable {
     Path localTokensFile = new Path (filename);
     FileSystem localFS = FileSystem.getLocal(conf);
     FSDataInputStream in = localFS.open(localTokensFile);
-    TokenStorage ts = new TokenStorage();
+    Credentials ts = new Credentials();
     ts.readFields(in);
     for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
       ugi.addToken(token);

+ 1 - 1
src/core/org/apache/hadoop/security/UserGroupInformation.java

@@ -366,7 +366,7 @@ public class UserGroupInformation {
         loginUser = new UserGroupInformation(login.getSubject());
         String tokenFile = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
         if (tokenFile != null && isSecurityEnabled()) {
-          TokenStorage.readTokensAndLoadInUGI(tokenFile, new Configuration(), loginUser);
+          Credentials.readTokensAndLoadInUGI(tokenFile, new Configuration(), loginUser);
         }
       } catch (LoginException le) {
         throw new IOException("failure to login", le);

+ 2 - 1
src/core/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java

@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -41,7 +42,7 @@ public class ServiceAuthorizationManager {
    * 
    * @deprecated Use
    *             {@link CommonConfigurationKeys#HADOOP_SECURITY_AUTHORIZATION}
-   *             Instead.
+   *             instead.
    */
   @Deprecated
   public static final String SERVICE_AUTHORIZATION_CONFIG = 

+ 2 - 2
src/hdfs/org/apache/hadoop/hdfs/server/namenode/DelegationTokenServlet.java

@@ -30,7 +30,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 
@@ -73,7 +73,7 @@ public class DelegationTokenServlet extends DfsServlet {
           String s = NameNode.getAddress(conf).getAddress().getHostAddress()
                      + ":" + NameNode.getAddress(conf).getPort();
           token.setService(new Text(s));
-          TokenStorage ts = new TokenStorage();
+          Credentials ts = new Credentials();
           ts.addToken(new Text(ugi.getShortUserName()), token);
           ts.write(dosFinal);
           dosFinal.close();

+ 3 - 3
src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java

@@ -33,7 +33,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.DelegationTokenServlet;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
@@ -123,7 +123,7 @@ public class DelegationTokenFetcher {
       + ":" + dfs.getUri().getPort();
     token.setService(new Text(nnAddress));
     
-    TokenStorage ts = new TokenStorage();
+    Credentials ts = new Credentials();
     ts.addToken(new Text(shortName), token);
     ts.write(out);
   }
@@ -151,7 +151,7 @@ public class DelegationTokenFetcher {
       URLConnection connection = remoteURL.openConnection();
       
       InputStream in = connection.getInputStream();
-      TokenStorage ts = new TokenStorage();
+      Credentials ts = new Credentials();
       dis = new DataInputStream(in);
       ts.readFields(dis);
       file = new DataOutputStream(new FileOutputStream(filename));

+ 6 - 2
src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 
@@ -743,9 +744,12 @@ public class TrackerDistributedCacheManager {
   /**
    * For each archive or cache file - get the corresponding delegation token
    * @param job
+   * @param credentials
    * @throws IOException
    */
-  public static void getDelegationTokens(Configuration job) throws IOException {
+  public static void getDelegationTokens(Configuration job, 
+                                         Credentials credentials) 
+  throws IOException {
     URI[] tarchives = DistributedCache.getCacheArchives(job);
     URI[] tfiles = DistributedCache.getCacheFiles(job);
 
@@ -765,6 +769,6 @@ public class TrackerDistributedCacheManager {
       }
     }
 
-    TokenCache.obtainTokensForNamenodes(ps, job);
+    TokenCache.obtainTokensForNamenodes(credentials, ps, job);
   }
 }

+ 11 - 5
src/mapred/org/apache/hadoop/mapred/Child.java

@@ -37,7 +37,7 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Shell;
@@ -71,12 +71,12 @@ class Child {
     // file name is passed thru env
     String jobTokenFile = 
       System.getenv().get(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-    TokenStorage ts = 
-      TokenCache.loadTaskTokenStorage(jobTokenFile, defaultConf);
-    LOG.debug("loading token. # keys =" +ts.numberOfSecretKeys() + 
+    Credentials credentials = 
+      TokenCache.loadTokens(jobTokenFile, defaultConf);
+    LOG.debug("loading token. # keys =" +credentials.numberOfSecretKeys() + 
         "; from file=" + jobTokenFile);
     
-    Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
+    Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
     jt.setService(new Text(address.getAddress().getHostAddress() + ":"
         + address.getPort()));
     UserGroupInformation current = UserGroupInformation.getCurrentUser();
@@ -86,6 +86,9 @@ class Child {
      = UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString());
     taskOwner.addToken(jt);
     
+    // Set the credentials
+    defaultConf.setCredentials(credentials);
+    
     final TaskUmbilicalProtocol umbilical = 
       taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
         @Override
@@ -170,7 +173,10 @@ class Child {
         //create the index file so that the log files 
         //are viewable immediately
         TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+        
+        // Create the job-conf and set credentials
         final JobConf job = new JobConf(task.getJobFile());
+        job.setCredentials(defaultConf.getCredentials());
 
         // set the jobTokenFile into task
         task.setJobTokenSecret(JobTokenSecretManager.

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/FileInputFormat.java

@@ -157,7 +157,7 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
     }
 
     // get tokens for all the required FileSystems..
-    TokenCache.obtainTokensForNamenodes(dirs, job);
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
     
     List<FileStatus> result = new ArrayList<FileStatus>();
     List<IOException> errors = new ArrayList<IOException>();

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java

@@ -109,7 +109,8 @@ public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {
       setOutputPath(job, outDir);
       
       // get delegation token for the outDir's file system
-      TokenCache.obtainTokensForNamenodes(new Path[] {outDir}, job);
+      TokenCache.obtainTokensForNamenodes(job.getCredentials(), 
+                                          new Path[] {outDir}, job);
       
       // check its existence
       if (fs.exists(outDir)) {

+ 13 - 8
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -70,6 +70,7 @@ import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.split.JobSplitWriter;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -661,7 +662,8 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     //  set the public/private visibility of the archives and files
     TrackerDistributedCacheManager.determineCacheVisibilities(job);
     // get DelegationTokens for cache files
-    TrackerDistributedCacheManager.getDelegationTokens(job);
+    TrackerDistributedCacheManager.getDelegationTokens(job, 
+                                                       job.getCredentials());
 
     String originalJarPath = job.getJar();
 
@@ -761,8 +763,9 @@ public class JobClient extends Configured implements MRConstants, Tool  {
           copyAndConfigureFiles(jobCopy, submitJobDir);
 
           // get delegation token for the dir
-          TokenCache.obtainTokensForNamenodes(new Path [] {submitJobDir},
-              jobCopy);
+          TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),
+                                              new Path [] {submitJobDir},
+                                              jobCopy);
 
           Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
           int reduces = jobCopy.getNumReduceTasks();
@@ -801,9 +804,9 @@ public class JobClient extends Configured implements MRConstants, Tool  {
           //
           // Now, actually submit the job (using the submit name)
           //
-          populateTokenCache(jobCopy);
+          populateTokenCache(jobCopy, jobCopy.getCredentials());
           status = jobSubmitClient.submitJob(
-              jobId, submitJobDir.toString(), TokenCache.getTokenStorage());
+              jobId, submitJobDir.toString(), jobCopy.getCredentials());
           if (status != null) {
             return new NetworkedJob(status);
           } else {
@@ -1877,7 +1880,8 @@ public class JobClient extends Configured implements MRConstants, Tool  {
   
   //get secret keys and tokens and store them into TokenCache
   @SuppressWarnings("unchecked")
-  private void populateTokenCache(Configuration conf) throws IOException{
+  private void populateTokenCache(Configuration conf, Credentials credentials) 
+  throws IOException{
     // create TokenStorage object with user secretKeys
     String tokensFileName = conf.get("tokenCacheFile");
     if(tokensFileName != null) {
@@ -1892,7 +1896,8 @@ public class JobClient extends Configured implements MRConstants, Tool  {
           mapper.readValue(new File(localFileName), Map.class);
 
         for(Map.Entry<String, String> ent: nm.entrySet()) {
-          TokenCache.addSecretKey(new Text(ent.getKey()), ent.getValue().getBytes());
+          credentials.addSecretKey(new Text(ent.getKey()), 
+                                   ent.getValue().getBytes());
         }
       } catch (JsonMappingException e) {
         json_error = true;
@@ -1911,7 +1916,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
       for(int i=0; i< nameNodes.length; i++) {
         ps[i] = new Path(nameNodes[i]);
       }
-      TokenCache.obtainTokensForNamenodes(ps, conf);
+      TokenCache.obtainTokensForNamenodes(credentials, ps, conf);
     }
   }
 }

+ 21 - 0
src/mapred/org/apache/hadoop/mapred/JobConf.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapred.lib.HashPartitioner;
 import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 
@@ -314,6 +315,8 @@ public class JobConf extends Configuration {
   public static final String MAPRED_REDUCE_TASK_ENV =
     "mapred.reduce.child.env";
 
+  private Credentials credentials = new Credentials();
+  
   /**
    * Construct a map/reduce job configuration.
    */
@@ -338,6 +341,12 @@ public class JobConf extends Configuration {
    */
   public JobConf(Configuration conf) {
     super(conf);
+    
+    if (conf instanceof JobConf) {
+      JobConf that = (JobConf)conf;
+      credentials = that.credentials;
+    }
+    
     checkAndWarnDeprecation();
   }
 
@@ -384,6 +393,18 @@ public class JobConf extends Configuration {
     checkAndWarnDeprecation();
   }
 
+  /**
+   * Get credentials for the job.
+   * @return credentials for the job
+   */
+  public Credentials getCredentials() {
+    return credentials;
+  }
+  
+  void setCredentials(Credentials credentials) {
+    this.credentials = credentials;
+  }
+  
   /**
    * Get the user jar for the map-reduce job.
    * 

+ 4 - 4
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -60,7 +60,7 @@ import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -134,7 +134,7 @@ public class JobInProgress {
   JobPriority priority = JobPriority.NORMAL;
   final JobTracker jobtracker;
   
-  protected TokenStorage tokenStorage;
+  protected Credentials tokenStorage;
 
   // NetworkTopology Node to the set of TIPs
   Map<Node, List<TaskInProgress>> nonRunningMapCache;
@@ -335,7 +335,7 @@ public class JobInProgress {
   }
 
   JobInProgress(JobTracker jobtracker, final JobConf default_conf, 
-      JobInfo jobInfo, int rCount, TokenStorage ts) 
+      JobInfo jobInfo, int rCount, Credentials ts) 
   throws IOException, InterruptedException {
     this.restartCount = rCount;
     this.jobId = JobID.downgrade(jobInfo.getJobID());
@@ -3218,7 +3218,7 @@ public class JobInProgress {
     
     // add this token to the tokenStorage
     if(tokenStorage == null)
-      tokenStorage = new TokenStorage();
+      tokenStorage = new Credentials();
 
     TokenCache.setJobToken(token, tokenStorage);
         

+ 2 - 2
src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java

@@ -25,7 +25,7 @@ import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSelector;
 import org.apache.hadoop.security.KerberosInfo;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenInfo;
 
@@ -91,7 +91,7 @@ interface JobSubmissionProtocol extends VersionedProtocol {
    * that job.
    * The job files should be submitted in <b>jobSubmitDir</b>.
    */
-  public JobStatus submitJob(JobID jobName, String jobSubmitDir, TokenStorage ts) 
+  public JobStatus submitJob(JobID jobName, String jobSubmitDir, Credentials ts) 
   throws IOException;
 
   /**

+ 3 - 6
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -108,7 +108,7 @@ import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 
 /*******************************************************
  * JobTracker is the central location for submitting and 
@@ -206,7 +206,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   
   private Clock clock;
 
-  private TokenStorage tokenStorage;
+  private Credentials tokenStorage;
   private final JobTokenSecretManager jobTokenSecretManager
     = new JobTokenSecretManager();
 
@@ -3632,7 +3632,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
    * of the JobTracker.  But JobInProgress adds info that's useful for
    * the JobTracker alone.
    */
-  public JobStatus submitJob(JobID jobId, String jobSubmitDir, TokenStorage ts)
+  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
       throws IOException {
     JobInfo jobInfo = null;
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
@@ -3986,9 +3986,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     }
   }
   
-  /**
-   * @see ClientProtocol#setJobPriority(JobID, String)
-   */
   public synchronized void setJobPriority(JobID jobid, 
                                           String priority)
                                           throws IOException {

+ 6 - 4
src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -44,7 +44,7 @@ import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 
 /** Implements MapReduce locally, in-process, for debugging. */ 
@@ -409,10 +409,12 @@ class LocalJobRunner implements JobSubmissionProtocol {
     return new JobID("local", ++jobid);
   }
 
-  public JobStatus submitJob(JobID jobid, String jobSubmitDir, TokenStorage ts) 
+  public JobStatus submitJob(JobID jobid, String jobSubmitDir, 
+                             Credentials credentials) 
   throws IOException {
-    TokenCache.setTokenStorage(ts);
-    return new Job(jobid, jobSubmitDir).status;
+    Job job = new Job(jobid, jobSubmitDir);
+    job.job.setCredentials(credentials);
+    return job.status;
   }
 
   public void killJob(JobID id) {

+ 1 - 0
src/mapred/org/apache/hadoop/mapred/Reporter.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred;
 
 import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.Progressable;
 
 /** 

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/Task.java

@@ -50,6 +50,7 @@ import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -579,7 +580,7 @@ abstract public class Task implements Writable, Configurable {
       } else {
         return split;
       }
-    }    
+    }  
     /** 
      * The communication thread handles communication with the parent (Task Tracker). 
      * It sends progress updates if progress has been made or if the task needs to 

+ 2 - 2
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -103,7 +103,7 @@ import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 
 /*******************************************************
  * TaskTracker is a process that starts and tracks MR Tasks
@@ -1019,7 +1019,7 @@ public class TaskTracker
     rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
     
     
-    TokenStorage ts = TokenCache.loadTokens(localJobTokenFile, fConf);
+    Credentials ts = TokenCache.loadTokens(localJobTokenFile, fConf);
     Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
     if (jt != null) { //could be null in the case of some unit tests
       getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);

+ 11 - 0
src/mapred/org/apache/hadoop/mapreduce/JobContext.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -47,6 +48,7 @@ public class JobContext {
     "mapreduce.partitioner.class";
 
   protected final org.apache.hadoop.mapred.JobConf conf;
+  protected final Credentials credentials;
   private final JobID jobId;
 
   public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers";
@@ -67,6 +69,7 @@ public class JobContext {
   
   public JobContext(Configuration conf, JobID jobId) {
     this.conf = new org.apache.hadoop.mapred.JobConf(conf);
+    this.credentials = this.conf.getCredentials();
     this.jobId = jobId;
     try {
       this.ugi = UserGroupInformation.getCurrentUser();
@@ -83,6 +86,14 @@ public class JobContext {
     return conf;
   }
 
+  /**
+   * Get credentials for the job.
+   * @return credentials for the job
+   */
+  public Credentials getCredentials() {
+    return credentials;
+  }
+
   /**
    * Get the unique ID for the job.
    * @return the object with the job id

+ 2 - 1
src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java

@@ -193,7 +193,8 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
     }
     
     // get tokens for all the required FileSystems..
-    TokenCache.obtainTokensForNamenodes(dirs, job.getConfiguration());
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 
+                                        job.getConfiguration());
 
     List<IOException> errors = new ArrayList<IOException>();
     

+ 3 - 1
src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java

@@ -122,7 +122,9 @@ public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
     }
     
     // get delegation token for outDir's file system
-    TokenCache.obtainTokensForNamenodes(new Path[] {outDir}, job.getConfiguration());
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(), 
+                                        new Path[] {outDir}, 
+                                        job.getConfiguration());
 
     if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
       throw new FileAlreadyExistsException("Output directory " + outDir + 

+ 20 - 85
src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java

@@ -37,7 +37,7 @@ import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -53,44 +53,17 @@ public class TokenCache {
   
   private static final Log LOG = LogFactory.getLog(TokenCache.class);
 
-  private static TokenStorage tokenStorage;
-  
   /**
    * auxiliary method to get user's secret keys..
    * @param alias
    * @return secret key from the storage
    */
-  public static byte[] getSecretKey(Text alias) {
-    if(tokenStorage == null)
+  public static byte[] getSecretKey(Credentials credentials, Text alias) {
+    if(credentials == null)
       return null;
-    return tokenStorage.getSecretKey(alias);
-  }
-  
-  /**
-   * auxiliary methods to store user'  s secret keys
-   * @param alias
-   * @param key
-   */
-  public static void addSecretKey(Text alias, byte[] key) {
-    getTokenStorage().addSecretKey(alias, key);
-  }
-  
-  /**
-   * auxiliary method to add a delegation token
-   */
-  public static void addDelegationToken(
-      String namenode, Token<? extends TokenIdentifier> t) {
-    getTokenStorage().addToken(new Text(namenode), t);
+    return credentials.getSecretKey(alias);
   }
 
-  /**
-   * auxiliary method 
-   * @return all the available tokens
-   */
-  public static Collection<Token<? extends TokenIdentifier>> getAllTokens() {
-    return getTokenStorage().getAllTokens();
-  }
-  
   /**
    * Convenience method to obtain delegation tokens from namenodes 
    * corresponding to the paths passed.
@@ -98,15 +71,17 @@ public class TokenCache {
    * @param conf configuration
    * @throws IOException
    */
-  public static void obtainTokensForNamenodes(Path [] ps, Configuration conf) 
+  public static void obtainTokensForNamenodes(Credentials credentials,
+                                              Path [] ps, Configuration conf) 
   throws IOException {
     if (!UserGroupInformation.isSecurityEnabled()) {
       return;
     }
-    obtainTokensForNamenodesInternal(ps, conf);
+    obtainTokensForNamenodesInternal(credentials, ps, conf);
   }
 
-  static void obtainTokensForNamenodesInternal(Path [] ps, Configuration conf)
+  static void obtainTokensForNamenodesInternal(Credentials credentials,
+                                               Path [] ps, Configuration conf)
   throws IOException {
     // get jobtracker principal id (for the renewer)
     Text jtCreds = new Text(conf.get(JobTracker.JT_USER_NAME, ""));
@@ -120,7 +95,7 @@ public class TokenCache {
 
         // see if we already have the token
         Token<DelegationTokenIdentifier> token = 
-          TokenCache.getDelegationToken(fs_addr); 
+          TokenCache.getDelegationToken(credentials, fs_addr); 
         if(token != null) {
           LOG.debug("DT for " + token.getService()  + " is already present");
           continue;
@@ -131,7 +106,7 @@ public class TokenCache {
           throw new IOException("Token from " + fs_addr + " is null");
 
         token.setService(new Text(fs_addr));
-        TokenCache.addDelegationToken(fs_addr, token);
+        credentials.addToken(new Text(fs_addr), token);
         LOG.info("getting dt for " + p.toString() + ";uri="+ fs_addr + 
             ";t.service="+token.getService());
       }
@@ -159,64 +134,24 @@ public class TokenCache {
   @SuppressWarnings("unchecked")
   //@InterfaceAudience.Private
   public static Token<DelegationTokenIdentifier> 
-  getDelegationToken(String namenode) {
-    return (Token<DelegationTokenIdentifier>)getTokenStorage().
-    getToken(new Text(namenode));
+  getDelegationToken(Credentials credentials, String namenode) {
+    return (Token<DelegationTokenIdentifier>)
+        credentials.getToken(new Text(namenode));
   }
 
-  /**
-   * @return TokenStore object
-   */
-  //@InterfaceAudience.Private
-  public static TokenStorage getTokenStorage() {
-    if(tokenStorage==null)
-      tokenStorage = new TokenStorage();
-
-    return tokenStorage;
-  }
-
-  /**
-   * sets TokenStorage
-   * @param ts
-   */
-  //@InterfaceAudience.Private
-  public static void setTokenStorage(TokenStorage ts) {
-    if(tokenStorage != null)
-      LOG.warn("Overwriting existing token storage with # keys=" + 
-          tokenStorage.numberOfSecretKeys());
-    tokenStorage = ts;
-  }
-  
-  /**
-   * load token storage and stores it
-   * @param conf
-   * @return Loaded TokenStorage object
-   * @throws IOException
-   */
-  //@InterfaceAudience.Private
-  public static TokenStorage loadTaskTokenStorage(String fileName, JobConf conf)
-  throws IOException {
-    if(tokenStorage != null)
-      return tokenStorage;
-    
-    tokenStorage = loadTokens(fileName, conf);
-    
-    return tokenStorage;
-  }
-  
   /**
    * load job token from a file
    * @param conf
    * @throws IOException
    */
   //@InterfaceAudience.Private
-  public static TokenStorage loadTokens(String jobTokenFile, JobConf conf) 
+  public static Credentials loadTokens(String jobTokenFile, JobConf conf) 
   throws IOException {
     Path localJobTokenFile = new Path (jobTokenFile);
     FileSystem localFS = FileSystem.getLocal(conf);
     FSDataInputStream in = localFS.open(localJobTokenFile);
     
-    TokenStorage ts = new TokenStorage();
+    Credentials ts = new Credentials();
     ts.readFields(in);
 
     if(LOG.isDebugEnabled()) {
@@ -233,8 +168,8 @@ public class TokenCache {
    */
   //@InterfaceAudience.Private
   public static void setJobToken(Token<? extends TokenIdentifier> t, 
-      TokenStorage ts) {
-    ts.addToken(JOB_TOKEN, t);
+      Credentials credentials) {
+    credentials.addToken(JOB_TOKEN, t);
   }
   /**
    * 
@@ -242,8 +177,8 @@ public class TokenCache {
    */
   //@InterfaceAudience.Private
   @SuppressWarnings("unchecked")
-  public static Token<JobTokenIdentifier> getJobToken(TokenStorage ts) {
-    return (Token<JobTokenIdentifier>) ts.getToken(JOB_TOKEN);
+  public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
+    return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
   }
 
   /**

+ 2 - 2
src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java

@@ -38,7 +38,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -110,7 +110,7 @@ public class DelegationTokenRenewal {
   
   @SuppressWarnings("unchecked")
   public static synchronized void registerDelegationTokensForRenewal(
-      JobID jobId, TokenStorage ts, Configuration conf) {
+      JobID jobId, Credentials ts, Configuration conf) {
     if(ts==null)
       return; //nothing to add
     

+ 34 - 33
src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java

@@ -43,13 +43,14 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -57,6 +58,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -65,7 +67,27 @@ public class TestTokenCache {
   private static final int NUM_OF_KEYS = 10;
 
   // my sleep class - adds check for tokenCache
-  static class MySleepJob extends SleepJob {
+  static class MySleepJob extends SleepJob implements JobConfigurable {
+    Credentials ts;
+    
+    public void configure(JobConf job) {
+        //Credentials in the job will not have delegation tokens
+        //because security is disabled. Fetch delegation tokens
+        //and populate the credential in the job.
+    	try {
+          ts = job.getCredentials();
+          Path p1 = new Path("file1");
+          p1 = p1.getFileSystem(job).makeQualified(p1);
+          Credentials cred = new Credentials();
+          TokenCache.obtainTokensForNamenodesInternal(cred, new Path [] {p1}, job);
+          for (Token<? extends TokenIdentifier> t: cred.getAllTokens()) {
+            ts.addToken(new Text("Hdfs"), t);
+          }
+    	} catch (IOException e) {
+    		Assert.fail("Exception "+e);
+    	}
+    }
+    
     /**
      * attempts to access tokenCache as from client
      */
@@ -74,23 +96,12 @@ public class TestTokenCache {
         OutputCollector<IntWritable, NullWritable> output, Reporter reporter)
         throws IOException {
       // get token storage and a key
-      TokenStorage ts = TokenCache.getTokenStorage();
-      byte[] key1 = TokenCache.getSecretKey(new Text("alias1"));
-      Collection<Token<? extends TokenIdentifier>> dts = TokenCache.getAllTokens();
+      byte[] key1 = ts.getSecretKey(new Text("alias1"));
+      Collection<Token<? extends TokenIdentifier>> dts = ts.getAllTokens();
       int dts_size = 0;
       if(dts != null)
         dts_size = dts.size();
 
-      System.out.println("inside MAP: ts==NULL?=" + (ts==null) + 
-          "; #keys = " + (ts==null? 0:ts.numberOfSecretKeys()) + 
-          ";jobToken = " +  (ts==null? "n/a":TokenCache.getJobToken(ts)) +
-          "; alias1 key=" + new String(key1) + 
-          "; dts size= " + dts_size);
-    
-      for(Token<? extends TokenIdentifier> t : dts) {
-        System.out.println(t.getKind() + "=" + StringUtils.byteToHexString(t.getPassword()));
-      }
-
       if(dts.size() != 2) { // one job token and one delegation token
         throw new RuntimeException("tokens are not available"); // fail the test
       }
@@ -143,11 +154,7 @@ public class TestTokenCache {
     
     p1 = new Path("file1");
     p2 = new Path("file2");
-    
     p1 = fs.makeQualified(p1);
-    // do not qualify p2
-    TokenCache.setTokenStorage(new TokenStorage());
-    TokenCache.obtainTokensForNamenodesInternal(new Path [] {p1, p2}, jConf);
   }
 
   @AfterClass
@@ -176,7 +183,6 @@ public class TestTokenCache {
       throw new IOException(e);
     }
     
-    System.out.println("writing secret keys into " + tokenFileName);
     try {
       File p  = new File(tokenFileName.getParent().toString());
       p.mkdirs();
@@ -193,8 +199,6 @@ public class TestTokenCache {
     Map<String, String> map;
     map = mapper.readValue(new File(tokenFileName.toString()), Map.class);
     assertEquals("didn't read JSON correctly", map.size(), NUM_OF_KEYS);
-    
-    System.out.println("file " + tokenFileName + " verified; size="+ map.size());
   }
   
   /**
@@ -203,9 +207,6 @@ public class TestTokenCache {
    */
   @Test
   public void testTokenCache() throws IOException {
-    
-    System.out.println("running dist job");
-    
     // make sure JT starts
     jConf = mrCluster.createJobConf();
     
@@ -241,12 +242,10 @@ public class TestTokenCache {
    */
   @Test
   public void testLocalJobTokenCache() throws NoSuchAlgorithmException, IOException {
-    
-    System.out.println("running local job");
     // this is local job
     String[] args = {"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"}; 
     jConf.set("tokenCacheFile", tokenFileName.toString());
-    
+
     int res = -1;
     try {
       res = ToolRunner.run(jConf, new MySleepJob(), args);
@@ -262,21 +261,23 @@ public class TestTokenCache {
   public void testGetTokensForNamenodes() throws IOException {
     FileSystem fs = dfsCluster.getFileSystem();
 
+    Credentials credentials = new Credentials();
+    TokenCache.obtainTokensForNamenodesInternal(credentials, new Path [] {p1, p2},
+                                        jConf);
     // this token is keyed by hostname:port key.
     String fs_addr = TokenCache.buildDTServiceName(p1.toUri()); 
-    Token<DelegationTokenIdentifier> nnt = TokenCache.getDelegationToken(fs_addr);
-    System.out.println("dt for " + p1 + "(" + fs_addr + ")" + " = " +  nnt);
+    Token<DelegationTokenIdentifier> nnt =
+      TokenCache.getDelegationToken(credentials, fs_addr);
 
     assertNotNull("Token for nn is null", nnt);
 
     // verify the size
-    Collection<Token<? extends TokenIdentifier>> tns = TokenCache.getAllTokens();
+    Collection<Token<? extends TokenIdentifier>> tns =
+      credentials.getAllTokens();
     assertEquals("number of tokens is not 1", 1, tns.size());
 
     boolean found = false;
     for(Token<? extends TokenIdentifier> t: tns) {
-      System.out.println("kind="+t.getKind() + ";servic=" + t.getService() + ";str=" + t.toString());
-
       if(t.getKind().equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND) &&
           t.getService().equals(new Text(fs_addr))) {
         found = true;

+ 3 - 3
src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java

@@ -38,7 +38,7 @@ import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.util.StringUtils;
@@ -238,7 +238,7 @@ public class TestDelegationTokenRenewal {
     String nn2 = DelegationTokenRenewal.SCHEME + "://host2:0";
     String nn3 = DelegationTokenRenewal.SCHEME + "://host3:0";
     
-    TokenStorage ts = new TokenStorage();
+    Credentials ts = new Credentials();
     
     // register the token for renewal
     ts.addToken(new Text(nn1), token1);
@@ -273,7 +273,7 @@ public class TestDelegationTokenRenewal {
     // add another token ( that expires in 2 secs). Then remove it, before
     // time is up.
     // Wait for 3 secs , and make sure no renews were called
-    ts = new TokenStorage();
+    ts = new Credentials();
     MyToken token4 = dfs.getDelegationToken(new Text("user4"));
     
     //to cause this one to be set for renew in 2 secs

+ 4 - 4
src/test/org/apache/hadoop/security/TestTokenStorage.java → src/test/org/apache/hadoop/security/TestJobCredentials.java

@@ -40,14 +40,14 @@ import javax.crypto.KeyGenerator;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.junit.Before;
 import org.junit.Test;
 import static org.junit.Assert.*;
 
-public class TestTokenStorage {
+public class TestJobCredentials {
   private static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA1";
   private static final File tmpDir =
     new File(System.getProperty("test.build.data", "/tmp"), "mapred");  
@@ -62,7 +62,7 @@ public class TestTokenStorage {
   public <T extends TokenIdentifier> void testReadWriteStorage() 
   throws IOException, NoSuchAlgorithmException{
     // create tokenStorage Object
-    TokenStorage ts = new TokenStorage();
+    Credentials ts = new Credentials();
     
     Token<T> token1 = new Token();
     Token<T> token2 = new Token();
@@ -98,7 +98,7 @@ public class TestTokenStorage {
     // open and read it back
     DataInputStream dis = 
       new DataInputStream(new FileInputStream(tmpFileName));    
-    ts = new TokenStorage();
+    ts = new Credentials();
     ts.readFields(dis);
     dis.close();
     

+ 2 - 2
src/test/org/apache/hadoop/tools/TestDelegationTokenFetcher.java

@@ -32,7 +32,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -80,7 +80,7 @@ public class TestDelegationTokenFetcher {
     new DelegationTokenFetcher(dfs, out, ugi).go();
     
     // now read the data back in and verify correct values
-    TokenStorage ts = new TokenStorage();
+    Credentials ts = new Credentials();
     DataInputStream dis = 
       new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
     ts.readFields(dis);

+ 8 - 5
src/tools/org/apache/hadoop/tools/DistCp.java

@@ -621,14 +621,15 @@ public class DistCp implements Tool {
   }
 
   /** Sanity check for srcPath */
-  private static void checkSrcPath(Configuration conf, List<Path> srcPaths
-      ) throws IOException {
+  private static void checkSrcPath(Configuration conf, 
+                                   List<Path> srcPaths, JobConf jobConf)
+  throws IOException {
     List<IOException> rslt = new ArrayList<IOException>();
     
     // get tokens for all the required FileSystems..
     Path[] ps = new Path[srcPaths.size()];
     ps = srcPaths.toArray(ps);
-    TokenCache.obtainTokensForNamenodes(ps, conf);
+    TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, conf);
 
     for (Path p : srcPaths) {
       FileSystem fs = p.getFileSystem(conf);
@@ -649,9 +650,10 @@ public class DistCp implements Tool {
       ) throws IOException {
     LOG.info("srcPaths=" + args.srcs);
     LOG.info("destPath=" + args.dst);
-    checkSrcPath(conf, args.srcs);
 
     JobConf job = createJobConf(conf);
+    
+    checkSrcPath(conf, args.srcs, job);
     if (args.preservedAttributes != null) {
       job.set(PRESERVE_STATUS_LABEL, args.preservedAttributes);
     }
@@ -1027,7 +1029,8 @@ public class DistCp implements Tool {
     FileSystem dstfs = args.dst.getFileSystem(conf);
     
     // get tokens for all the required FileSystems..
-    TokenCache.obtainTokensForNamenodes(new Path[] {args.dst}, conf);
+    TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), 
+                                        new Path[] {args.dst}, conf);
     
     boolean dstExists = dstfs.exists(args.dst);
     boolean dstIsDir = false;