|
@@ -22,6 +22,7 @@ import java.io.File;
|
|
|
import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.PrintWriter;
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.Properties;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeSet;
|
|
@@ -37,7 +38,6 @@ import org.apache.hadoop.examples.SleepJob;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
-import org.apache.hadoop.security.UnixUserGroupInformation;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
|
|
|
public class TestQueueManager extends TestCase {
|
|
@@ -46,7 +46,26 @@ public class TestQueueManager extends TestCase {
|
|
|
|
|
|
private MiniDFSCluster miniDFSCluster;
|
|
|
private MiniMRCluster miniMRCluster;
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * For some tests it is necessary to sandbox them in a doAs with a fake user
|
|
|
+ * due to bug HADOOP-6527, which wipes out real group mappings. It's also
|
|
|
+ * necessary to then add the real user running the test to the fake users
|
|
|
+ * so that child processes can write to the DFS.
|
|
|
+ */
|
|
|
+ private UserGroupInformation createNecessaryUsers() throws IOException {
|
|
|
+ // Add real user to fake groups mapping so that child processes (tasks)
|
|
|
+ // will have permissions on the dfs
|
|
|
+ String j = UserGroupInformation.getCurrentUser().getUserName();
|
|
|
+ UserGroupInformation.createUserForTesting(j, new String [] { "supergroup"});
|
|
|
+
|
|
|
+
|
|
|
+ // Create a fake superuser for all processes to execute within
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.createUserForTesting("Zork",
|
|
|
+ new String [] {"Zork"});
|
|
|
+ return ugi;
|
|
|
+ }
|
|
|
+
|
|
|
public void testDefaultQueueConfiguration() {
|
|
|
JobConf conf = new JobConf();
|
|
|
QueueManager qMgr = new QueueManager(conf);
|
|
@@ -78,23 +97,27 @@ public class TestQueueManager extends TestCase {
|
|
|
assertEquals(qMgr.getSchedulerInfo("qq1"), "queueInfoForqq1");
|
|
|
}
|
|
|
|
|
|
- public void testAllEnabledACLForJobSubmission() throws IOException {
|
|
|
+ public void testAllEnabledACLForJobSubmission()
|
|
|
+ throws IOException, InterruptedException {
|
|
|
JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
|
|
|
verifyJobSubmission(conf, true);
|
|
|
}
|
|
|
|
|
|
- public void testAllDisabledACLForJobSubmission() throws IOException {
|
|
|
+ public void testAllDisabledACLForJobSubmission()
|
|
|
+ throws IOException, InterruptedException {
|
|
|
JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "");
|
|
|
verifyJobSubmission(conf, false);
|
|
|
}
|
|
|
|
|
|
- public void testUserDisabledACLForJobSubmission() throws IOException {
|
|
|
+ public void testUserDisabledACLForJobSubmission()
|
|
|
+ throws IOException, InterruptedException {
|
|
|
JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
|
|
|
"3698-non-existent-user");
|
|
|
verifyJobSubmission(conf, false);
|
|
|
}
|
|
|
|
|
|
- public void testDisabledACLForNonDefaultQueue() throws IOException {
|
|
|
+ public void testDisabledACLForNonDefaultQueue()
|
|
|
+ throws IOException, InterruptedException {
|
|
|
// allow everyone in default queue
|
|
|
JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
|
|
|
// setup a different queue
|
|
@@ -105,7 +128,8 @@ public class TestQueueManager extends TestCase {
|
|
|
verifyJobSubmission(conf, false, "q1");
|
|
|
}
|
|
|
|
|
|
- public void testSubmissionToInvalidQueue() throws IOException{
|
|
|
+ public void testSubmissionToInvalidQueue()
|
|
|
+ throws IOException, InterruptedException{
|
|
|
JobConf conf = new JobConf();
|
|
|
conf.set("mapred.queue.names","default");
|
|
|
setUpCluster(conf);
|
|
@@ -121,10 +145,10 @@ public class TestQueueManager extends TestCase {
|
|
|
fail("Job submission to invalid queue job shouldnot complete , it should fail with proper exception ");
|
|
|
}
|
|
|
|
|
|
- public void testEnabledACLForNonDefaultQueue() throws IOException,
|
|
|
- LoginException {
|
|
|
+ public void testEnabledACLForNonDefaultQueue()
|
|
|
+ throws IOException, LoginException, InterruptedException {
|
|
|
// login as self...
|
|
|
- UserGroupInformation ugi = UnixUserGroupInformation.login();
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
String userName = ugi.getUserName();
|
|
|
// allow everyone in default queue
|
|
|
JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
|
|
@@ -137,9 +161,9 @@ public class TestQueueManager extends TestCase {
|
|
|
}
|
|
|
|
|
|
public void testUserEnabledACLForJobSubmission()
|
|
|
- throws IOException, LoginException {
|
|
|
+ throws IOException, LoginException, InterruptedException {
|
|
|
// login as self...
|
|
|
- UserGroupInformation ugi = UnixUserGroupInformation.login();
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
String userName = ugi.getUserName();
|
|
|
JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
|
|
|
"3698-junk-user," + userName
|
|
@@ -148,56 +172,126 @@ public class TestQueueManager extends TestCase {
|
|
|
}
|
|
|
|
|
|
public void testGroupsEnabledACLForJobSubmission()
|
|
|
- throws IOException, LoginException {
|
|
|
+ throws IOException, LoginException, InterruptedException {
|
|
|
// login as self, get one group, and add in allowed list.
|
|
|
- UserGroupInformation ugi = UnixUserGroupInformation.login();
|
|
|
- String[] groups = ugi.getGroupNames();
|
|
|
- assertTrue(groups.length > 0);
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
|
|
|
- "3698-junk-user1,3698-junk-user2 "
|
|
|
- + groups[groups.length-1]
|
|
|
- + ",3698-junk-group");
|
|
|
- verifyJobSubmission(conf, true);
|
|
|
+ UserGroupInformation ugi = createNecessaryUsers();
|
|
|
+
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Object run() throws Exception {
|
|
|
+ String[] groups = UserGroupInformation.getCurrentUser().getGroupNames();
|
|
|
+ JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
|
|
|
+ "3698-junk-user1,3698-junk-user2 "
|
|
|
+ + groups[groups.length-1]
|
|
|
+ + ",3698-junk-group");
|
|
|
+ verifyJobSubmission(conf, true);
|
|
|
+
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
}
|
|
|
|
|
|
- public void testAllEnabledACLForJobKill() throws IOException {
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "*");
|
|
|
- verifyJobKill(conf, true);
|
|
|
+ public void testAllEnabledACLForJobKill()
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ UserGroupInformation ugi = createNecessaryUsers();
|
|
|
+
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Object run() throws Exception {
|
|
|
+ JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "*");
|
|
|
+ verifyJobKill(conf, true);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
- public void testAllDisabledACLForJobKill() throws IOException {
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "");
|
|
|
- verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-user-group");
|
|
|
+ public void testAllDisabledACLForJobKill()
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ // Create a fake superuser for all processes to execute within
|
|
|
+ UserGroupInformation ugi = createNecessaryUsers();
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Object run() throws Exception {
|
|
|
+ // No one should be able to kill jobs
|
|
|
+ JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "");
|
|
|
+ // Run as dummy-user, who (obviously) is not able to kill the job,
|
|
|
+ // and expect him to fail
|
|
|
+ verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-group");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
- public void testOwnerAllowedForJobKill() throws IOException {
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
|
|
|
+ public void testOwnerAllowedForJobKill()
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ UserGroupInformation ugi = createNecessaryUsers();
|
|
|
+
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Object run() throws Exception {
|
|
|
+
|
|
|
+ JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
|
|
|
"junk-user");
|
|
|
- verifyJobKill(conf, true);
|
|
|
+ verifyJobKill(conf, true);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
- public void testUserDisabledACLForJobKill() throws IOException {
|
|
|
- //setup a cluster allowing a user to submit
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
|
|
|
- "dummy-user");
|
|
|
- verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-user-group");
|
|
|
- }
|
|
|
+ public void testUserDisabledACLForJobKill()
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ UserGroupInformation ugi = createNecessaryUsers();
|
|
|
+
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+ @Override
|
|
|
+ public Object run() throws Exception {
|
|
|
+ //setup a cluster allowing a user to submit
|
|
|
+ JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
|
|
|
+ "dummy-user");
|
|
|
+ verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-group");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
|
|
|
- public void testUserEnabledACLForJobKill() throws IOException,
|
|
|
- LoginException {
|
|
|
- // login as self...
|
|
|
- UserGroupInformation ugi = UnixUserGroupInformation.login();
|
|
|
- String userName = ugi.getUserName();
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
|
|
|
- "dummy-user,"+userName);
|
|
|
- verifyJobKillAsOtherUser(conf, true, "dummy-user,dummy-user-group");
|
|
|
- }
|
|
|
+ public void testUserEnabledACLForJobKill()
|
|
|
+ throws IOException, LoginException, InterruptedException {
|
|
|
+ UserGroupInformation ugi = createNecessaryUsers();
|
|
|
|
|
|
- public void testUserDisabledForJobPriorityChange() throws IOException {
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+ @Override
|
|
|
+ public Object run() throws Exception {
|
|
|
+ // login as self...
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
+ String userName = ugi.getUserName();
|
|
|
+ JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
|
|
|
+ "dummy-user,"+userName);
|
|
|
+ verifyJobKillAsOtherUser(conf, true, "dummy-user,dummy-group");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testUserDisabledForJobPriorityChange()
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ UserGroupInformation ugi = createNecessaryUsers();
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Object run() throws Exception {
|
|
|
+
|
|
|
+ JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
|
|
|
"junk-user");
|
|
|
- verifyJobPriorityChangeAsOtherUser(conf, false,
|
|
|
- "junk-user,junk-user-group");
|
|
|
+ verifyJobPriorityChangeAsOtherUser(conf, false,
|
|
|
+ "junk-user,dummy-group");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -232,7 +326,7 @@ public class TestQueueManager extends TestCase {
|
|
|
//Create a new configuration to be used with QueueManager
|
|
|
JobConf conf = new JobConf();
|
|
|
QueueManager queueManager = new QueueManager(conf);
|
|
|
- UserGroupInformation ugi = UnixUserGroupInformation.getCurrentUGI();
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
//Job Submission should fail because ugi to be used is set to blank.
|
|
|
assertFalse("User Job Submission Succeeded before refresh.",
|
|
|
queueManager.hasAccess("default", QueueManager.QueueOperation.
|
|
@@ -245,10 +339,8 @@ public class TestQueueManager extends TestCase {
|
|
|
SUBMIT_JOB, ugi));
|
|
|
|
|
|
//Test job submission as alternate user.
|
|
|
- Configuration alternateUserConfig = new Configuration();
|
|
|
- alternateUserConfig.set("hadoop.job.ugi","u1,users");
|
|
|
UserGroupInformation alternateUgi =
|
|
|
- UserGroupInformation.readFrom(alternateUserConfig);
|
|
|
+ UserGroupInformation.createUserForTesting("u1", new String[]{"user"});
|
|
|
assertTrue("Alternate User Job Submission failed before refresh.",
|
|
|
queueManager.hasAccess("q2", QueueManager.QueueOperation.
|
|
|
SUBMIT_JOB, alternateUgi));
|
|
@@ -310,7 +402,7 @@ public class TestQueueManager extends TestCase {
|
|
|
|
|
|
//properties for mapred-queue-acls.xml
|
|
|
Properties queueConfProps = new Properties();
|
|
|
- UserGroupInformation ugi = UnixUserGroupInformation.getCurrentUGI();
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
queueConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
|
|
|
queueConfProps.put("mapred.queue.q1.acl-submit-job", ugi.getUserName());
|
|
|
queueConfProps.put("mapred.queue.q2.acl-submit-job", ugi.getUserName());
|
|
@@ -379,12 +471,12 @@ public class TestQueueManager extends TestCase {
|
|
|
}
|
|
|
|
|
|
private void verifyJobSubmission(JobConf conf, boolean shouldSucceed)
|
|
|
- throws IOException {
|
|
|
+ throws IOException, InterruptedException {
|
|
|
verifyJobSubmission(conf, shouldSucceed, "default");
|
|
|
}
|
|
|
|
|
|
private void verifyJobSubmission(JobConf conf, boolean shouldSucceed,
|
|
|
- String queue) throws IOException {
|
|
|
+ String queue) throws IOException, InterruptedException {
|
|
|
setUpCluster(conf);
|
|
|
try {
|
|
|
runAndVerifySubmission(conf, shouldSucceed, queue, null);
|
|
@@ -395,7 +487,7 @@ public class TestQueueManager extends TestCase {
|
|
|
|
|
|
private void runAndVerifySubmission(JobConf conf, boolean shouldSucceed,
|
|
|
String queue, String userInfo)
|
|
|
- throws IOException {
|
|
|
+ throws IOException, InterruptedException {
|
|
|
try {
|
|
|
RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, userInfo, queue);
|
|
|
if (shouldSucceed) {
|
|
@@ -428,7 +520,7 @@ public class TestQueueManager extends TestCase {
|
|
|
}
|
|
|
|
|
|
private void verifyJobKill(JobConf conf, boolean shouldSucceed)
|
|
|
- throws IOException {
|
|
|
+ throws IOException, InterruptedException {
|
|
|
setUpCluster(conf);
|
|
|
try {
|
|
|
RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false);
|
|
@@ -470,7 +562,7 @@ public class TestQueueManager extends TestCase {
|
|
|
|
|
|
private void verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed,
|
|
|
String otherUserInfo)
|
|
|
- throws IOException {
|
|
|
+ throws IOException, InterruptedException {
|
|
|
setUpCluster(conf);
|
|
|
try {
|
|
|
// submit a job as another user.
|
|
@@ -512,7 +604,7 @@ public class TestQueueManager extends TestCase {
|
|
|
|
|
|
private void verifyJobPriorityChangeAsOtherUser(JobConf conf,
|
|
|
boolean shouldSucceed, String otherUserInfo)
|
|
|
- throws IOException {
|
|
|
+ throws IOException, InterruptedException {
|
|
|
setUpCluster(conf);
|
|
|
try {
|
|
|
// submit job as another user.
|
|
@@ -552,6 +644,7 @@ public class TestQueueManager extends TestCase {
|
|
|
private void setUpCluster(JobConf conf) throws IOException {
|
|
|
miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
|
|
|
FileSystem fileSys = miniDFSCluster.getFileSystem();
|
|
|
+ TestMiniMRWithDFSWithDistinctUsers.mkdir(fileSys, "/user");
|
|
|
TestMiniMRWithDFSWithDistinctUsers.mkdir(fileSys,
|
|
|
conf.get("mapreduce.jobtracker.staging.root.dir",
|
|
|
"/tmp/hadoop/mapred/staging"));
|
|
@@ -568,7 +661,7 @@ public class TestQueueManager extends TestCase {
|
|
|
private RunningJob submitSleepJob(int numMappers, int numReducers,
|
|
|
long mapSleepTime, long reduceSleepTime,
|
|
|
boolean shouldComplete)
|
|
|
- throws IOException {
|
|
|
+ throws IOException, InterruptedException {
|
|
|
return submitSleepJob(numMappers, numReducers, mapSleepTime,
|
|
|
reduceSleepTime, shouldComplete, null);
|
|
|
}
|
|
@@ -576,19 +669,20 @@ public class TestQueueManager extends TestCase {
|
|
|
private RunningJob submitSleepJob(int numMappers, int numReducers,
|
|
|
long mapSleepTime, long reduceSleepTime,
|
|
|
boolean shouldComplete, String userInfo)
|
|
|
- throws IOException {
|
|
|
+ throws IOException, InterruptedException {
|
|
|
return submitSleepJob(numMappers, numReducers, mapSleepTime,
|
|
|
reduceSleepTime, shouldComplete, userInfo, null);
|
|
|
}
|
|
|
|
|
|
- private RunningJob submitSleepJob(int numMappers, int numReducers,
|
|
|
- long mapSleepTime, long reduceSleepTime,
|
|
|
- boolean shouldComplete, String userInfo,
|
|
|
+ private RunningJob submitSleepJob(final int numMappers, final int numReducers,
|
|
|
+ final long mapSleepTime,
|
|
|
+ final long reduceSleepTime, final boolean shouldComplete, String userInfo,
|
|
|
String queueName)
|
|
|
- throws IOException {
|
|
|
+ throws IOException, InterruptedException {
|
|
|
JobConf clientConf = new JobConf();
|
|
|
clientConf.set("mapred.job.tracker", "localhost:"
|
|
|
+ miniMRCluster.getJobTrackerPort());
|
|
|
+ UserGroupInformation ugi;
|
|
|
SleepJob job = new SleepJob();
|
|
|
job.setConf(clientConf);
|
|
|
clientConf = job.setupJobConf(numMappers, numReducers,
|
|
@@ -597,18 +691,26 @@ public class TestQueueManager extends TestCase {
|
|
|
if (queueName != null) {
|
|
|
clientConf.setQueueName(queueName);
|
|
|
}
|
|
|
- JobConf jc = new JobConf(clientConf);
|
|
|
+ final JobConf jc = new JobConf(clientConf);
|
|
|
if (userInfo != null) {
|
|
|
- jc.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
|
|
|
- }
|
|
|
- RunningJob rJob = null;
|
|
|
- if (shouldComplete) {
|
|
|
- rJob = JobClient.runJob(jc);
|
|
|
+ String[] splits = userInfo.split(",");
|
|
|
+ String[] groups = new String[splits.length - 1];
|
|
|
+ System.arraycopy(splits, 1, groups, 0, splits.length - 1);
|
|
|
+ ugi = UserGroupInformation.createUserForTesting(splits[0], groups);
|
|
|
} else {
|
|
|
- // Job should be submitted as 'userInfo'. So both the client as well as
|
|
|
- // the configuration should point to the same UGI.
|
|
|
- rJob = new JobClient(jc).submitJob(jc);
|
|
|
+ ugi = UserGroupInformation.getCurrentUser();
|
|
|
}
|
|
|
+ RunningJob rJob = ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
|
|
|
+ public RunningJob run() throws IOException {
|
|
|
+ if (shouldComplete) {
|
|
|
+ return JobClient.runJob(jc);
|
|
|
+ } else {
|
|
|
+ // Job should be submitted as 'userInfo'. So both the client as well as
|
|
|
+ // the configuration should point to the same UGI.
|
|
|
+ return new JobClient(jc).submitJob(jc);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
return rJob;
|
|
|
}
|
|
|
|