|
@@ -37,6 +37,8 @@ import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
|
|
|
+import javax.security.auth.login.LoginException;
|
|
|
+
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -45,6 +47,7 @@ import org.apache.hadoop.filecache.DistributedCache;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.io.BytesWritable;
|
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
import org.apache.hadoop.io.Text;
|
|
@@ -56,6 +59,7 @@ import org.apache.hadoop.io.retry.RetryProxy;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.mapred.TaskInProgress;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
+import org.apache.hadoop.security.UnixUserGroupInformation;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.util.Tool;
|
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
@@ -425,6 +429,13 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
return submitJob(job);
|
|
|
}
|
|
|
|
|
|
+ // job files are world-wide readable and owner writable
|
|
|
+ final private static FsPermission JOB_FILE_PERMISSION =
|
|
|
+ FsPermission.createImmutable((short) 0644); // rw-r--r--
|
|
|
+
|
|
|
+ // system directories are world-wide readable and owner readable
|
|
|
+ final static FsPermission SYSTEM_DIR_PERMISSION =
|
|
|
+ FsPermission.createImmutable((short) 0733); // rwx-wx-wx
|
|
|
|
|
|
/**
|
|
|
* Submit a job to the MR system.
|
|
@@ -440,8 +451,21 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
*/
|
|
|
public RunningJob submitJob(JobConf job) throws FileNotFoundException,
|
|
|
InvalidJobConfException, IOException {
|
|
|
+ /*
|
|
|
+ * set this user's id in job configuration, so later job files can be
|
|
|
+ * accessed using this user's id
|
|
|
+ */
|
|
|
+ try {
|
|
|
+ UnixUserGroupInformation.saveToConf(job,
|
|
|
+ UnixUserGroupInformation.UGI_PROPERTY_NAME, UnixUserGroupInformation
|
|
|
+ .login(job));
|
|
|
+ } catch (LoginException e) {
|
|
|
+ throw new IOException("Failed to get the current user's information: "
|
|
|
+ + e.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
//
|
|
|
- // First figure out what fs the JobTracker is using. Copy the
|
|
|
+ // Figure out what fs the JobTracker is using. Copy the
|
|
|
// job to it, under a temporary name. This allows DFS to work,
|
|
|
// and under the local fs also provides UNIX-like object loading
|
|
|
// semantics. (that is, if the job file is deleted right after
|
|
@@ -454,6 +478,7 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
FileSystem fs = getFs();
|
|
|
LOG.debug("default FileSystem: " + fs.getUri());
|
|
|
fs.delete(submitJobDir);
|
|
|
+ FileSystem.mkdirs(fs, submitJobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
|
|
|
Path submitJobFile = new Path(submitJobDir, "job.xml");
|
|
|
Path submitJarFile = new Path(submitJobDir, "job.jar");
|
|
|
Path submitSplitFile = new Path(submitJobDir, "job.split");
|
|
@@ -492,6 +517,7 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
job.setJar(submitJarFile.toString());
|
|
|
fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
|
|
|
fs.setReplication(submitJarFile, replication);
|
|
|
+ fs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION));
|
|
|
} else {
|
|
|
LOG.warn("No job jar file set. User classes may not be found. "+
|
|
|
"See JobConf(Class) or JobConf#setJar(String).");
|
|
@@ -535,7 +561,8 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
}
|
|
|
});
|
|
|
// write the splits to a file for the job tracker
|
|
|
- FSDataOutputStream out = fs.create(submitSplitFile);
|
|
|
+ FSDataOutputStream out = FileSystem.create(fs,
|
|
|
+ submitSplitFile, new FsPermission(JOB_FILE_PERMISSION));
|
|
|
try {
|
|
|
writeSplitsFile(splits, out);
|
|
|
} finally {
|
|
@@ -545,7 +572,9 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
|
job.setNumMapTasks(splits.length);
|
|
|
|
|
|
// Write job file to JobTracker's fs
|
|
|
- out = fs.create(submitJobFile, replication);
|
|
|
+ out = FileSystem.create(fs, submitJobFile,
|
|
|
+ new FsPermission(JOB_FILE_PERMISSION));
|
|
|
+
|
|
|
try {
|
|
|
job.write(out);
|
|
|
} finally {
|