|
@@ -38,12 +38,16 @@ import org.apache.hadoop.examples.SleepJob;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
+import org.apache.hadoop.mapred.QueueManager.QueueOperation;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
|
|
|
public class TestQueueManager extends TestCase {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(TestQueueManager.class);
|
|
|
-
|
|
|
+
|
|
|
+ String submitAcl = QueueOperation.SUBMIT_JOB.getAclName();
|
|
|
+ String adminAcl = QueueOperation.ADMINISTER_JOBS.getAclName();
|
|
|
+
|
|
|
private MiniDFSCluster miniDFSCluster;
|
|
|
private MiniMRCluster miniMRCluster;
|
|
|
|
|
@@ -56,13 +60,12 @@ public class TestQueueManager extends TestCase {
|
|
|
private UserGroupInformation createNecessaryUsers() throws IOException {
|
|
|
// Add real user to fake groups mapping so that child processes (tasks)
|
|
|
// will have permissions on the dfs
|
|
|
- String j = UserGroupInformation.getCurrentUser().getUserName();
|
|
|
- UserGroupInformation.createUserForTesting(j, new String [] { "supergroup"});
|
|
|
+ String j = UserGroupInformation.getCurrentUser().getShortUserName();
|
|
|
+ UserGroupInformation.createUserForTesting(j, new String [] { "myGroup"});
|
|
|
|
|
|
-
|
|
|
- // Create a fake superuser for all processes to execute within
|
|
|
+ // Create a fake user for all processes to execute within
|
|
|
UserGroupInformation ugi = UserGroupInformation.createUserForTesting("Zork",
|
|
|
- new String [] {"Zork"});
|
|
|
+ new String [] {"ZorkGroup"});
|
|
|
return ugi;
|
|
|
}
|
|
|
|
|
@@ -73,7 +76,7 @@ public class TestQueueManager extends TestCase {
|
|
|
expQueues.add("default");
|
|
|
verifyQueues(expQueues, qMgr.getQueues());
|
|
|
// pass true so it will fail if the key is not found.
|
|
|
- assertFalse(conf.getBoolean("mapred.acls.enabled", true));
|
|
|
+ assertFalse(conf.getBoolean(JobConf.MR_ACLS_ENABLED, true));
|
|
|
}
|
|
|
|
|
|
public void testMultipleQueues() {
|
|
@@ -86,7 +89,7 @@ public class TestQueueManager extends TestCase {
|
|
|
expQueues.add("Q3");
|
|
|
verifyQueues(expQueues, qMgr.getQueues());
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void testSchedulerInfo() {
|
|
|
JobConf conf = new JobConf();
|
|
|
conf.set("mapred.queue.names", "qq1,qq2");
|
|
@@ -99,35 +102,56 @@ public class TestQueueManager extends TestCase {
|
|
|
|
|
|
public void testAllEnabledACLForJobSubmission()
|
|
|
throws IOException, InterruptedException {
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
|
|
|
- verifyJobSubmission(conf, true);
|
|
|
+ JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
+ "default", submitAcl), "*");
|
|
|
+ UserGroupInformation ugi = createNecessaryUsers();
|
|
|
+ String[] groups = ugi.getGroupNames();
|
|
|
+ verifyJobSubmissionToDefaultQueue(conf, true,
|
|
|
+ ugi.getShortUserName() + "," + groups[groups.length-1]);
|
|
|
}
|
|
|
|
|
|
public void testAllDisabledACLForJobSubmission()
|
|
|
throws IOException, InterruptedException {
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "");
|
|
|
- verifyJobSubmission(conf, false);
|
|
|
+ createNecessaryUsers();
|
|
|
+ JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
+ "default", submitAcl), " ");
|
|
|
+ String userName = "user1";
|
|
|
+ String groupName = "group1";
|
|
|
+ verifyJobSubmissionToDefaultQueue(conf, false, userName + "," + groupName);
|
|
|
+
|
|
|
+ // Check if member of supergroup can submit job
|
|
|
+ conf.set(JobConf.MR_SUPERGROUP, groupName);
|
|
|
+ verifyJobSubmissionToDefaultQueue(conf, true, userName + "," + groupName);
|
|
|
+
|
|
|
+ // Check if MROwner(user who started the mapreduce cluster) can submit job
|
|
|
+ UserGroupInformation mrOwner = UserGroupInformation.getCurrentUser();
|
|
|
+ userName = mrOwner.getShortUserName();
|
|
|
+ String[] groups = mrOwner.getGroupNames();
|
|
|
+ groupName = groups[groups.length - 1];
|
|
|
+ verifyJobSubmissionToDefaultQueue(conf, true, userName + "," + groupName);
|
|
|
}
|
|
|
|
|
|
public void testUserDisabledACLForJobSubmission()
|
|
|
throws IOException, InterruptedException {
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
|
|
|
- "3698-non-existent-user");
|
|
|
- verifyJobSubmission(conf, false);
|
|
|
+ JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
+ "default", submitAcl), "3698-non-existent-user");
|
|
|
+ verifyJobSubmissionToDefaultQueue(conf, false, "user1,group1");
|
|
|
}
|
|
|
|
|
|
public void testDisabledACLForNonDefaultQueue()
|
|
|
throws IOException, InterruptedException {
|
|
|
// allow everyone in default queue
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
|
|
|
+ JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
+ "default", submitAcl), "*");
|
|
|
// setup a different queue
|
|
|
conf.set("mapred.queue.names", "default,q1");
|
|
|
// setup a different acl for this queue.
|
|
|
- conf.set("mapred.queue.q1.acl-submit-job", "dummy-user");
|
|
|
+ conf.set(QueueManager.toFullPropertyName(
|
|
|
+ "q1", submitAcl), "dummy-user");
|
|
|
// verify job submission to other queue fails.
|
|
|
- verifyJobSubmission(conf, false, "q1");
|
|
|
+ verifyJobSubmission(conf, false, "user1,group1", "q1");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void testSubmissionToInvalidQueue()
|
|
|
throws IOException, InterruptedException{
|
|
|
JobConf conf = new JobConf();
|
|
@@ -144,65 +168,62 @@ public class TestQueueManager extends TestCase {
|
|
|
}
|
|
|
fail("Job submission to invalid queue job shouldnot complete , it should fail with proper exception ");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void testEnabledACLForNonDefaultQueue()
|
|
|
throws IOException, LoginException, InterruptedException {
|
|
|
- // login as self...
|
|
|
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
- String userName = ugi.getUserName();
|
|
|
+ UserGroupInformation ugi = createNecessaryUsers();
|
|
|
+ String[] groups = ugi.getGroupNames();
|
|
|
+ String userName = ugi.getShortUserName();
|
|
|
// allow everyone in default queue
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
|
|
|
+ JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
+ "default", submitAcl), "*");
|
|
|
// setup a different queue
|
|
|
conf.set("mapred.queue.names", "default,q2");
|
|
|
// setup a different acl for this queue.
|
|
|
- conf.set("mapred.queue.q2.acl-submit-job", userName);
|
|
|
+ conf.set(QueueManager.toFullPropertyName(
|
|
|
+ "q2", submitAcl), userName);
|
|
|
// verify job submission to other queue fails.
|
|
|
- verifyJobSubmission(conf, true, "q2");
|
|
|
+ verifyJobSubmission(conf, true,
|
|
|
+ userName + "," + groups[groups.length-1], "q2");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void testUserEnabledACLForJobSubmission()
|
|
|
throws IOException, LoginException, InterruptedException {
|
|
|
- // login as self...
|
|
|
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
- String userName = ugi.getUserName();
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
|
|
|
- "3698-junk-user," + userName
|
|
|
+ String userName = "user1";
|
|
|
+ JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
+ "default", submitAcl), "3698-junk-user," + userName
|
|
|
+ " 3698-junk-group1,3698-junk-group2");
|
|
|
- verifyJobSubmission(conf, true);
|
|
|
+ verifyJobSubmissionToDefaultQueue(conf, true, userName+",group1");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void testGroupsEnabledACLForJobSubmission()
|
|
|
throws IOException, LoginException, InterruptedException {
|
|
|
// login as self, get one group, and add in allowed list.
|
|
|
UserGroupInformation ugi = createNecessaryUsers();
|
|
|
-
|
|
|
- ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
-
|
|
|
- @Override
|
|
|
- public Object run() throws Exception {
|
|
|
- String[] groups = UserGroupInformation.getCurrentUser().getGroupNames();
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
|
|
|
- "3698-junk-user1,3698-junk-user2 "
|
|
|
- + groups[groups.length-1]
|
|
|
- + ",3698-junk-group");
|
|
|
- verifyJobSubmission(conf, true);
|
|
|
-
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
|
|
|
+ String[] groups = ugi.getGroupNames();
|
|
|
+ JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
+ "default", submitAcl), "3698-junk-user1,3698-junk-user2 "
|
|
|
+ + groups[groups.length-1]
|
|
|
+ + ",3698-junk-group");
|
|
|
+ verifyJobSubmissionToDefaultQueue(conf, true,
|
|
|
+ ugi.getShortUserName()+","+groups[groups.length-1]);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void testAllEnabledACLForJobKill()
|
|
|
throws IOException, InterruptedException {
|
|
|
UserGroupInformation ugi = createNecessaryUsers();
|
|
|
-
|
|
|
+ // create other user who will try to kill the job of ugi.
|
|
|
+ final UserGroupInformation otherUGI = UserGroupInformation.
|
|
|
+ createUserForTesting("user1", new String [] {"group1"});
|
|
|
+
|
|
|
ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
|
|
|
@Override
|
|
|
public Object run() throws Exception {
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "*");
|
|
|
- verifyJobKill(conf, true);
|
|
|
+ JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
+ "default", adminAcl), "*");
|
|
|
+ verifyJobKill(otherUGI, conf, true);
|
|
|
return null;
|
|
|
}
|
|
|
});
|
|
@@ -211,16 +232,30 @@ public class TestQueueManager extends TestCase {
|
|
|
public void testAllDisabledACLForJobKill()
|
|
|
throws IOException, InterruptedException {
|
|
|
// Create a fake superuser for all processes to execute within
|
|
|
- UserGroupInformation ugi = createNecessaryUsers();
|
|
|
+ final UserGroupInformation ugi = createNecessaryUsers();
|
|
|
+
|
|
|
+ // create other user who will try to kill the job of ugi.
|
|
|
+ final UserGroupInformation otherUGI = UserGroupInformation.
|
|
|
+ createUserForTesting("user1", new String [] {"group1"});
|
|
|
+
|
|
|
ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
|
|
|
@Override
|
|
|
public Object run() throws Exception {
|
|
|
- // No one should be able to kill jobs
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "");
|
|
|
- // Run as dummy-user, who (obviously) is not able to kill the job,
|
|
|
- // and expect him to fail
|
|
|
- verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-group");
|
|
|
+ // No queue admins
|
|
|
+ JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
+ "default", adminAcl), " ");
|
|
|
+ // Run job as ugi and try to kill job as user1, who (obviously)
|
|
|
+ // should not be able to kill the job.
|
|
|
+ verifyJobKill(otherUGI, conf, false);
|
|
|
+
|
|
|
+ // Check if member of supergroup can kill job
|
|
|
+ conf.set(JobConf.MR_SUPERGROUP, "group1");
|
|
|
+ verifyJobKill(otherUGI, conf, true);
|
|
|
+
|
|
|
+ // Check if MROwner(user who started the mapreduce cluster) can kill job
|
|
|
+ verifyJobKill(ugi, conf, true);
|
|
|
+
|
|
|
return null;
|
|
|
}
|
|
|
});
|
|
@@ -228,16 +263,16 @@ public class TestQueueManager extends TestCase {
|
|
|
|
|
|
public void testOwnerAllowedForJobKill()
|
|
|
throws IOException, InterruptedException {
|
|
|
- UserGroupInformation ugi = createNecessaryUsers();
|
|
|
+ final UserGroupInformation ugi = createNecessaryUsers();
|
|
|
|
|
|
ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
|
|
|
@Override
|
|
|
public Object run() throws Exception {
|
|
|
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
|
|
|
- "junk-user");
|
|
|
- verifyJobKill(conf, true);
|
|
|
+ JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
+ "default", adminAcl), "junk-user");
|
|
|
+ verifyJobKill(ugi, conf, true);
|
|
|
return null;
|
|
|
}
|
|
|
});
|
|
@@ -246,32 +281,41 @@ public class TestQueueManager extends TestCase {
|
|
|
public void testUserDisabledACLForJobKill()
|
|
|
throws IOException, InterruptedException {
|
|
|
UserGroupInformation ugi = createNecessaryUsers();
|
|
|
-
|
|
|
+ // create other user who will try to kill the job of ugi.
|
|
|
+ final UserGroupInformation otherUGI = UserGroupInformation.
|
|
|
+ createUserForTesting("user1", new String [] {"group1"});
|
|
|
+
|
|
|
ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+
|
|
|
@Override
|
|
|
public Object run() throws Exception {
|
|
|
- //setup a cluster allowing a user to submit
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
|
|
|
- "dummy-user");
|
|
|
- verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-group");
|
|
|
+ //setup a cluster allowing a user to submit
|
|
|
+ JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
+ "default", adminAcl), "dummy-user");
|
|
|
+ // Run job as ugi and try to kill job as user1, who (obviously)
|
|
|
+ // should not able to kill the job.
|
|
|
+ verifyJobKill(otherUGI, conf, false);
|
|
|
return null;
|
|
|
}
|
|
|
});
|
|
|
- }
|
|
|
+ }
|
|
|
|
|
|
public void testUserEnabledACLForJobKill()
|
|
|
throws IOException, LoginException, InterruptedException {
|
|
|
UserGroupInformation ugi = createNecessaryUsers();
|
|
|
-
|
|
|
+ // create other user who will try to kill the job of ugi.
|
|
|
+ final UserGroupInformation otherUGI = UserGroupInformation.
|
|
|
+ createUserForTesting("user1", new String [] {"group1"});
|
|
|
+
|
|
|
ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
@Override
|
|
|
public Object run() throws Exception {
|
|
|
- // login as self...
|
|
|
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
- String userName = ugi.getUserName();
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
|
|
|
- "dummy-user,"+userName);
|
|
|
- verifyJobKillAsOtherUser(conf, true, "dummy-user,dummy-group");
|
|
|
+ String userName = ugi.getShortUserName();
|
|
|
+ JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
+ "default", adminAcl), "user1");
|
|
|
+ // user1 should be able to kill the job
|
|
|
+ verifyJobKill(otherUGI, conf, true);
|
|
|
return null;
|
|
|
}
|
|
|
});
|
|
@@ -280,15 +324,18 @@ public class TestQueueManager extends TestCase {
|
|
|
public void testUserDisabledForJobPriorityChange()
|
|
|
throws IOException, InterruptedException {
|
|
|
UserGroupInformation ugi = createNecessaryUsers();
|
|
|
+ // create other user who will try to change priority of the job of ugi.
|
|
|
+ final UserGroupInformation otherUGI = UserGroupInformation.
|
|
|
+ createUserForTesting("user1", new String [] {"group1"});
|
|
|
+
|
|
|
ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
|
|
|
@Override
|
|
|
public Object run() throws Exception {
|
|
|
|
|
|
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
|
|
|
- "junk-user");
|
|
|
- verifyJobPriorityChangeAsOtherUser(conf, false,
|
|
|
- "junk-user,dummy-group");
|
|
|
+ JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
+ "default", adminAcl), "junk-user");
|
|
|
+ verifyJobPriorityChangeAsOtherUser(otherUGI, conf, false);
|
|
|
return null;
|
|
|
}
|
|
|
});
|
|
@@ -310,23 +357,29 @@ public class TestQueueManager extends TestCase {
|
|
|
Properties hadoopConfProps = new Properties();
|
|
|
//these properties should be retained.
|
|
|
hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
|
|
|
- hadoopConfProps.put("mapred.acls.enabled", "true");
|
|
|
+ hadoopConfProps.put(JobConf.MR_ACLS_ENABLED, "true");
|
|
|
//These property should always be overridden
|
|
|
- hadoopConfProps.put("mapred.queue.default.acl-submit-job", "u1");
|
|
|
- hadoopConfProps.put("mapred.queue.q1.acl-submit-job", "u2");
|
|
|
- hadoopConfProps.put("mapred.queue.q2.acl-submit-job", "u1");
|
|
|
+ hadoopConfProps.put(QueueManager.toFullPropertyName(
|
|
|
+ "default", submitAcl), "u1");
|
|
|
+ hadoopConfProps.put(QueueManager.toFullPropertyName(
|
|
|
+ "q1", submitAcl), "u2");
|
|
|
+ hadoopConfProps.put(QueueManager.toFullPropertyName(
|
|
|
+ "q2", submitAcl), "u1");
|
|
|
UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
|
|
|
|
|
|
//Actual property which would be used.
|
|
|
Properties queueConfProps = new Properties();
|
|
|
- queueConfProps.put("mapred.queue.default.acl-submit-job", " ");
|
|
|
+ queueConfProps.put(QueueManager.toFullPropertyName(
|
|
|
+ "default", submitAcl), " ");
|
|
|
//Writing out the queue configuration file.
|
|
|
UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
|
|
|
|
|
|
//Create a new configuration to be used with QueueManager
|
|
|
JobConf conf = new JobConf();
|
|
|
QueueManager queueManager = new QueueManager(conf);
|
|
|
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.
|
|
|
+ createUserForTesting("user1", new String [] {"group1"});
|
|
|
+
|
|
|
//Job Submission should fail because ugi to be used is set to blank.
|
|
|
assertFalse("User Job Submission Succeeded before refresh.",
|
|
|
queueManager.hasAccess("default", QueueManager.QueueOperation.
|
|
@@ -345,10 +398,16 @@ public class TestQueueManager extends TestCase {
|
|
|
queueManager.hasAccess("q2", QueueManager.QueueOperation.
|
|
|
SUBMIT_JOB, alternateUgi));
|
|
|
|
|
|
- //Set acl for the current user.
|
|
|
- queueConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
|
|
|
- queueConfProps.put("mapred.queue.q1.acl-submit-job", ugi.getUserName());
|
|
|
- queueConfProps.put("mapred.queue.q2.acl-submit-job", ugi.getUserName());
|
|
|
+ //Set acl for user1.
|
|
|
+ queueConfProps.put(QueueManager.toFullPropertyName(
|
|
|
+ "default", submitAcl),
|
|
|
+ ugi.getShortUserName());
|
|
|
+ queueConfProps.put(QueueManager.toFullPropertyName(
|
|
|
+ "q1", submitAcl),
|
|
|
+ ugi.getShortUserName());
|
|
|
+ queueConfProps.put(QueueManager.toFullPropertyName(
|
|
|
+ "q2", submitAcl),
|
|
|
+ ugi.getShortUserName());
|
|
|
//write out queue-acls.xml.
|
|
|
UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
|
|
|
//refresh configuration
|
|
@@ -370,8 +429,10 @@ public class TestQueueManager extends TestCase {
|
|
|
queueConfigFile.delete();
|
|
|
|
|
|
//rewrite the mapred-site.xml
|
|
|
- hadoopConfProps.put("mapred.acls.enabled", "true");
|
|
|
- hadoopConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
|
|
|
+ hadoopConfProps.put(JobConf.MR_ACLS_ENABLED, "true");
|
|
|
+ hadoopConfProps.put(QueueManager.toFullPropertyName(
|
|
|
+ "default", submitAcl),
|
|
|
+ ugi.getShortUserName());
|
|
|
UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
|
|
|
queueManager.refreshAcls(conf);
|
|
|
assertTrue("User Job Submission failed after refresh and no queue acls file.",
|
|
@@ -397,15 +458,21 @@ public class TestQueueManager extends TestCase {
|
|
|
// queue properties with which the cluster is started.
|
|
|
Properties hadoopConfProps = new Properties();
|
|
|
hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
|
|
|
- hadoopConfProps.put("mapred.acls.enabled", "true");
|
|
|
+ hadoopConfProps.put(JobConf.MR_ACLS_ENABLED, "true");
|
|
|
UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
|
|
|
|
|
|
//properties for mapred-queue-acls.xml
|
|
|
Properties queueConfProps = new Properties();
|
|
|
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
- queueConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
|
|
|
- queueConfProps.put("mapred.queue.q1.acl-submit-job", ugi.getUserName());
|
|
|
- queueConfProps.put("mapred.queue.q2.acl-submit-job", ugi.getUserName());
|
|
|
+ queueConfProps.put(QueueManager.toFullPropertyName(
|
|
|
+ "default", submitAcl),
|
|
|
+ ugi.getShortUserName());
|
|
|
+ queueConfProps.put(QueueManager.toFullPropertyName(
|
|
|
+ "q1", submitAcl),
|
|
|
+ ugi.getShortUserName());
|
|
|
+ queueConfProps.put(QueueManager.toFullPropertyName(
|
|
|
+ "q2", submitAcl),
|
|
|
+ ugi.getShortUserName());
|
|
|
UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
|
|
|
|
|
|
Configuration conf = new JobConf();
|
|
@@ -457,7 +524,7 @@ public class TestQueueManager extends TestCase {
|
|
|
|
|
|
private JobConf setupConf(String aclName, String aclValue) {
|
|
|
JobConf conf = new JobConf();
|
|
|
- conf.setBoolean("mapred.acls.enabled", true);
|
|
|
+ conf.setBoolean(JobConf.MR_ACLS_ENABLED, true);
|
|
|
conf.set(aclName, aclValue);
|
|
|
return conf;
|
|
|
}
|
|
@@ -470,21 +537,30 @@ public class TestQueueManager extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void verifyJobSubmission(JobConf conf, boolean shouldSucceed)
|
|
|
- throws IOException, InterruptedException {
|
|
|
- verifyJobSubmission(conf, shouldSucceed, "default");
|
|
|
+ /**
|
|
|
+ * Verify job submission as given user to the default queue
|
|
|
+ */
|
|
|
+ private void verifyJobSubmissionToDefaultQueue(JobConf conf, boolean shouldSucceed,
|
|
|
+ String userInfo) throws IOException, InterruptedException {
|
|
|
+ verifyJobSubmission(conf, shouldSucceed, userInfo, "default");
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Verify job submission as given user to the given queue
|
|
|
+ */
|
|
|
private void verifyJobSubmission(JobConf conf, boolean shouldSucceed,
|
|
|
- String queue) throws IOException, InterruptedException {
|
|
|
+ String userInfo, String queue) throws IOException, InterruptedException {
|
|
|
setUpCluster(conf);
|
|
|
try {
|
|
|
- runAndVerifySubmission(conf, shouldSucceed, queue, null);
|
|
|
+ runAndVerifySubmission(conf, shouldSucceed, queue, userInfo);
|
|
|
} finally {
|
|
|
tearDownCluster();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Verify if submission of job to the given queue will succeed or not
|
|
|
+ */
|
|
|
private void runAndVerifySubmission(JobConf conf, boolean shouldSucceed,
|
|
|
String queue, String userInfo)
|
|
|
throws IOException, InterruptedException {
|
|
@@ -519,8 +595,16 @@ public class TestQueueManager extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void verifyJobKill(JobConf conf, boolean shouldSucceed)
|
|
|
- throws IOException, InterruptedException {
|
|
|
+ /**
|
|
|
+ * Submit job as current user and kill the job as user of ugi.
|
|
|
+ * @param ugi {@link UserGroupInformation} of user who tries to kill the job
|
|
|
+ * @param conf JobConf for the job
|
|
|
+ * @param shouldSucceed Should the killing of job be succeeded ?
|
|
|
+ * @throws IOException
|
|
|
+ * @throws InterruptedException
|
|
|
+ */
|
|
|
+ private void verifyJobKill(UserGroupInformation ugi, JobConf conf,
|
|
|
+ boolean shouldSucceed) throws IOException, InterruptedException {
|
|
|
setUpCluster(conf);
|
|
|
try {
|
|
|
RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false);
|
|
@@ -532,7 +616,20 @@ public class TestQueueManager extends TestCase {
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
- rjob.killJob();
|
|
|
+ conf.set("mapred.job.tracker", "localhost:"
|
|
|
+ + miniMRCluster.getJobTrackerPort());
|
|
|
+ final String jobId = rjob.getJobID();
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Object run() throws Exception {
|
|
|
+ RunningJob runningJob =
|
|
|
+ new JobClient(miniMRCluster.createJobConf()).getJob(jobId);
|
|
|
+ runningJob.killJob();
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
while(rjob.cleanupProgress() == 0.0f) {
|
|
|
try {
|
|
|
Thread.sleep(10);
|
|
@@ -549,10 +646,9 @@ public class TestQueueManager extends TestCase {
|
|
|
if (shouldSucceed) {
|
|
|
throw ioe;
|
|
|
} else {
|
|
|
- LOG.info("exception while submitting job: " + ioe.getMessage());
|
|
|
+ LOG.info("exception while submitting/killing job: " + ioe.getMessage());
|
|
|
assertTrue(ioe.getMessage().
|
|
|
- contains("cannot perform operation " +
|
|
|
- "ADMINISTER_JOBS on queue default"));
|
|
|
+ contains(" cannot perform operation MODIFY_JOB on "));
|
|
|
}
|
|
|
} finally {
|
|
|
tearDownCluster();
|
|
@@ -584,7 +680,7 @@ public class TestQueueManager extends TestCase {
|
|
|
throw ioe;
|
|
|
}
|
|
|
//verify it fails
|
|
|
- LOG.info("exception while submitting job: " + ioe.getMessage());
|
|
|
+ LOG.info("exception while killing job: " + ioe.getMessage());
|
|
|
assertTrue(ioe.getMessage().
|
|
|
contains("cannot perform operation " +
|
|
|
"ADMINISTER_JOBS on queue default"));
|
|
@@ -602,32 +698,51 @@ public class TestQueueManager extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void verifyJobPriorityChangeAsOtherUser(JobConf conf,
|
|
|
- boolean shouldSucceed, String otherUserInfo)
|
|
|
- throws IOException, InterruptedException {
|
|
|
+ /**
|
|
|
+ * Submit job as current user and try to change priority of that job as
|
|
|
+ * another user.
|
|
|
+ * @param otherUGI user who will try to change priority of job
|
|
|
+ * @param conf jobConf for the job
|
|
|
+ * @param shouldSucceed Should the changing of priority of job be succeeded ?
|
|
|
+ * @throws IOException
|
|
|
+ * @throws InterruptedException
|
|
|
+ */
|
|
|
+ private void verifyJobPriorityChangeAsOtherUser(UserGroupInformation otherUGI,
|
|
|
+ JobConf conf, final boolean shouldSucceed)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
setUpCluster(conf);
|
|
|
try {
|
|
|
- // submit job as another user.
|
|
|
- String userInfo = otherUserInfo;
|
|
|
- RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
|
|
|
+ // submit job as current user.
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
+ String[] groups = ugi.getGroupNames();
|
|
|
+ String userInfo = ugi.getShortUserName() + "," +
|
|
|
+ groups[groups.length - 1];
|
|
|
+ final RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
|
|
|
assertFalse(rjob.isComplete());
|
|
|
|
|
|
- // try to change priority as self
|
|
|
- try {
|
|
|
- conf.set("mapred.job.tracker", "localhost:"
|
|
|
- + miniMRCluster.getJobTrackerPort());
|
|
|
- JobClient client = new JobClient(miniMRCluster.createJobConf());
|
|
|
- client.getJob(rjob.getID()).setJobPriority("VERY_LOW");
|
|
|
- if (!shouldSucceed) {
|
|
|
- fail("changing priority should fail.");
|
|
|
+ conf.set("mapred.job.tracker", "localhost:"
|
|
|
+ + miniMRCluster.getJobTrackerPort());
|
|
|
+ // try to change priority as other user
|
|
|
+ otherUGI.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Object run() throws Exception {
|
|
|
+ try {
|
|
|
+ JobClient client = new JobClient(miniMRCluster.createJobConf());
|
|
|
+ client.getJob(rjob.getID()).setJobPriority("VERY_LOW");
|
|
|
+ if (!shouldSucceed) {
|
|
|
+ fail("changing priority should fail.");
|
|
|
+ }
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ //verify it fails
|
|
|
+ LOG.info("exception while changing priority of job: " +
|
|
|
+ ioe.getMessage());
|
|
|
+ assertTrue(ioe.getMessage().
|
|
|
+ contains(" cannot perform operation MODIFY_JOB on "));
|
|
|
+ }
|
|
|
+ return null;
|
|
|
}
|
|
|
- } catch (IOException ioe) {
|
|
|
- //verify it fails
|
|
|
- LOG.info("exception while submitting job: " + ioe.getMessage());
|
|
|
- assertTrue(ioe.getMessage().
|
|
|
- contains("cannot perform operation " +
|
|
|
- "ADMINISTER_JOBS on queue default"));
|
|
|
- }
|
|
|
+ });
|
|
|
//wait for job to complete on its own
|
|
|
while (!rjob.isComplete()) {
|
|
|
try {
|