|
@@ -358,15 +358,16 @@ public class JobLocalizer {
|
|
|
|
|
|
public void localizeJobFiles(JobID jobid, JobConf jConf,
|
|
|
Path localJobTokenFile, TaskUmbilicalProtocol taskTracker)
|
|
|
- throws IOException {
|
|
|
+ throws IOException, InterruptedException {
|
|
|
localizeJobFiles(jobid, jConf,
|
|
|
lDirAlloc.getLocalPathForWrite(JOBCONF, ttConf), localJobTokenFile,
|
|
|
taskTracker);
|
|
|
}
|
|
|
|
|
|
- public void localizeJobFiles(JobID jobid, JobConf jConf,
|
|
|
+ public void localizeJobFiles(final JobID jobid, JobConf jConf,
|
|
|
Path localJobFile, Path localJobTokenFile,
|
|
|
- TaskUmbilicalProtocol taskTracker) throws IOException {
|
|
|
+ final TaskUmbilicalProtocol taskTracker)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
// Download the job.jar for this job from the system FS
|
|
|
localizeJobJarFile(jConf);
|
|
|
|
|
@@ -379,9 +380,17 @@ public class JobLocalizer {
|
|
|
TaskTracker.resetNumTasksPerJvm(jConf);
|
|
|
|
|
|
//setup the distributed cache
|
|
|
- long[] sizes = downloadPrivateCache(jConf);
|
|
|
+ final long[] sizes = downloadPrivateCache(jConf);
|
|
|
if (sizes != null) {
|
|
|
- taskTracker.updatePrivateDistributedCacheSizes(jobid, sizes);
|
|
|
+ UserGroupInformation ugi =
|
|
|
+ UserGroupInformation.createRemoteUser(jobid.toString());
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+ public Object run() throws IOException {
|
|
|
+ taskTracker.updatePrivateDistributedCacheSizes(jobid, sizes);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
}
|
|
|
|
|
|
// Create job-acls.xml file in job userlog dir and write the needed
|
|
@@ -449,7 +458,8 @@ public class JobLocalizer {
|
|
|
}
|
|
|
|
|
|
public int runSetup(String user, String jobid, Path localJobTokenFile,
|
|
|
- TaskUmbilicalProtocol taskTracker) throws IOException {
|
|
|
+ TaskUmbilicalProtocol taskTracker) throws IOException,
|
|
|
+ InterruptedException {
|
|
|
// load user credentials, configuration
|
|
|
// ASSUME
|
|
|
// let $x = $mapred.local.dir
|