|
@@ -18,12 +18,8 @@
|
|
|
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
-import java.io.File;
|
|
|
-import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
-import java.io.PrintWriter;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
-import java.util.Properties;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeSet;
|
|
|
|
|
@@ -33,7 +29,6 @@ import junit.framework.TestCase;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.examples.SleepJob;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -48,8 +43,8 @@ public class TestQueueManager extends TestCase {
|
|
|
String submitAcl = QueueACL.SUBMIT_JOB.getAclName();
|
|
|
String adminAcl = QueueACL.ADMINISTER_JOBS.getAclName();
|
|
|
|
|
|
- private MiniDFSCluster miniDFSCluster;
|
|
|
- private MiniMRCluster miniMRCluster;
|
|
|
+ MiniDFSCluster miniDFSCluster;
|
|
|
+ MiniMRCluster miniMRCluster = null;
|
|
|
|
|
|
/**
|
|
|
* For some tests it is necessary to sandbox them in a doAs with a fake user
|
|
@@ -57,7 +52,7 @@ public class TestQueueManager extends TestCase {
|
|
|
* necessary to then add the real user running the test to the fake users
|
|
|
* so that child processes can write to the DFS.
|
|
|
*/
|
|
|
- private UserGroupInformation createNecessaryUsers() throws IOException {
|
|
|
+ UserGroupInformation createNecessaryUsers() throws IOException {
|
|
|
// Add real user to fake groups mapping so that child processes (tasks)
|
|
|
// will have permissions on the dfs
|
|
|
String j = UserGroupInformation.getCurrentUser().getShortUserName();
|
|
@@ -102,419 +97,102 @@ public class TestQueueManager extends TestCase {
|
|
|
|
|
|
public void testAllEnabledACLForJobSubmission()
|
|
|
throws IOException, InterruptedException {
|
|
|
- JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
- "default", submitAcl), "*");
|
|
|
- UserGroupInformation ugi = createNecessaryUsers();
|
|
|
- String[] groups = ugi.getGroupNames();
|
|
|
- verifyJobSubmissionToDefaultQueue(conf, true,
|
|
|
- ugi.getShortUserName() + "," + groups[groups.length-1]);
|
|
|
+ try {
|
|
|
+ JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
+ "default", submitAcl), "*");
|
|
|
+ UserGroupInformation ugi = createNecessaryUsers();
|
|
|
+ String[] groups = ugi.getGroupNames();
|
|
|
+ verifyJobSubmissionToDefaultQueue(conf, true,
|
|
|
+ ugi.getShortUserName() + "," + groups[groups.length-1]);
|
|
|
+ } finally {
|
|
|
+ tearDownCluster();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void testAllDisabledACLForJobSubmission()
|
|
|
throws IOException, InterruptedException {
|
|
|
- createNecessaryUsers();
|
|
|
- JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
- "default", submitAcl), " ");
|
|
|
- String userName = "user1";
|
|
|
- String groupName = "group1";
|
|
|
- verifyJobSubmissionToDefaultQueue(conf, false, userName + "," + groupName);
|
|
|
+ try {
|
|
|
+ createNecessaryUsers();
|
|
|
+ JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
+ "default", submitAcl), " ");
|
|
|
+ String userName = "user1";
|
|
|
+ String groupName = "group1";
|
|
|
+ verifyJobSubmissionToDefaultQueue(conf, false, userName + "," + groupName);
|
|
|
|
|
|
- // Check if admins can submit job
|
|
|
- String user2 = "user2";
|
|
|
- String group2 = "group2";
|
|
|
- conf.set(JobConf.MR_ADMINS, user2 + " " + groupName);
|
|
|
- verifyJobSubmissionToDefaultQueue(conf, true, userName + "," + groupName);
|
|
|
- verifyJobSubmissionToDefaultQueue(conf, true, user2 + "," + group2);
|
|
|
+ // Check if admins can submit job
|
|
|
+ String user2 = "user2";
|
|
|
+ String group2 = "group2";
|
|
|
+ conf.set(JobConf.MR_ADMINS, user2 + " " + groupName);
|
|
|
+ tearDownCluster();
|
|
|
+ verifyJobSubmissionToDefaultQueue(conf, true, userName + "," + groupName);
|
|
|
+ verifyJobSubmissionToDefaultQueue(conf, true, user2 + "," + group2);
|
|
|
|
|
|
- // Check if MROwner(user who started the mapreduce cluster) can submit job
|
|
|
- UserGroupInformation mrOwner = UserGroupInformation.getCurrentUser();
|
|
|
- userName = mrOwner.getShortUserName();
|
|
|
- String[] groups = mrOwner.getGroupNames();
|
|
|
- groupName = groups[groups.length - 1];
|
|
|
- verifyJobSubmissionToDefaultQueue(conf, true, userName + "," + groupName);
|
|
|
+ // Check if MROwner(user who started the mapreduce cluster) can submit job
|
|
|
+ UserGroupInformation mrOwner = UserGroupInformation.getCurrentUser();
|
|
|
+ userName = mrOwner.getShortUserName();
|
|
|
+ String[] groups = mrOwner.getGroupNames();
|
|
|
+ groupName = groups[groups.length - 1];
|
|
|
+ verifyJobSubmissionToDefaultQueue(conf, true, userName + "," + groupName);
|
|
|
+ } finally {
|
|
|
+ tearDownCluster();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void testUserDisabledACLForJobSubmission()
|
|
|
throws IOException, InterruptedException {
|
|
|
- JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
- "default", submitAcl), "3698-non-existent-user");
|
|
|
- verifyJobSubmissionToDefaultQueue(conf, false, "user1,group1");
|
|
|
- }
|
|
|
-
|
|
|
- public void testDisabledACLForNonDefaultQueue()
|
|
|
- throws IOException, InterruptedException {
|
|
|
- // allow everyone in default queue
|
|
|
- JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
- "default", submitAcl), "*");
|
|
|
- // setup a different queue
|
|
|
- conf.set("mapred.queue.names", "default,q1");
|
|
|
- // setup a different acl for this queue.
|
|
|
- conf.set(QueueManager.toFullPropertyName(
|
|
|
- "q1", submitAcl), "dummy-user");
|
|
|
- // verify job submission to other queue fails.
|
|
|
- verifyJobSubmission(conf, false, "user1,group1", "q1");
|
|
|
+ try {
|
|
|
+ JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
+ "default", submitAcl), "3698-non-existent-user");
|
|
|
+ verifyJobSubmissionToDefaultQueue(conf, false, "user1,group1");
|
|
|
+ } finally {
|
|
|
+ tearDownCluster();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void testSubmissionToInvalidQueue()
|
|
|
throws IOException, InterruptedException{
|
|
|
+ try {
|
|
|
JobConf conf = new JobConf();
|
|
|
conf.set("mapred.queue.names","default");
|
|
|
setUpCluster(conf);
|
|
|
String queueName = "q1";
|
|
|
try {
|
|
|
- RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, null, queueName);
|
|
|
+ submitSleepJob(1, 1, 100, 100, true, null, queueName);
|
|
|
} catch (IOException ioe) {
|
|
|
assertTrue(ioe.getMessage().contains("Queue \"" + queueName + "\" does not exist"));
|
|
|
return;
|
|
|
} finally {
|
|
|
tearDownCluster();
|
|
|
}
|
|
|
- fail("Job submission to invalid queue job shouldnot complete , it should fail with proper exception ");
|
|
|
- }
|
|
|
-
|
|
|
- public void testEnabledACLForNonDefaultQueue()
|
|
|
- throws IOException, LoginException, InterruptedException {
|
|
|
- UserGroupInformation ugi = createNecessaryUsers();
|
|
|
- String[] groups = ugi.getGroupNames();
|
|
|
- String userName = ugi.getShortUserName();
|
|
|
- // allow everyone in default queue
|
|
|
- JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
- "default", submitAcl), "*");
|
|
|
- // setup a different queue
|
|
|
- conf.set("mapred.queue.names", "default,q2");
|
|
|
- // setup a different acl for this queue.
|
|
|
- conf.set(QueueManager.toFullPropertyName(
|
|
|
- "q2", submitAcl), userName);
|
|
|
- // verify job submission to other queue fails.
|
|
|
- verifyJobSubmission(conf, true,
|
|
|
- userName + "," + groups[groups.length-1], "q2");
|
|
|
+ fail("Job submission to invalid queue job shouldnot complete , it should fail with proper exception ");
|
|
|
+ } finally {
|
|
|
+ tearDownCluster();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void testUserEnabledACLForJobSubmission()
|
|
|
throws IOException, LoginException, InterruptedException {
|
|
|
- String userName = "user1";
|
|
|
- JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
- "default", submitAcl), "3698-junk-user," + userName
|
|
|
- + " 3698-junk-group1,3698-junk-group2");
|
|
|
- verifyJobSubmissionToDefaultQueue(conf, true, userName+",group1");
|
|
|
- }
|
|
|
-
|
|
|
- public void testGroupsEnabledACLForJobSubmission()
|
|
|
- throws IOException, LoginException, InterruptedException {
|
|
|
- // login as self, get one group, and add in allowed list.
|
|
|
- UserGroupInformation ugi = createNecessaryUsers();
|
|
|
-
|
|
|
- String[] groups = ugi.getGroupNames();
|
|
|
- JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
- "default", submitAcl), "3698-junk-user1,3698-junk-user2 "
|
|
|
- + groups[groups.length-1]
|
|
|
- + ",3698-junk-group");
|
|
|
- verifyJobSubmissionToDefaultQueue(conf, true,
|
|
|
- ugi.getShortUserName()+","+groups[groups.length-1]);
|
|
|
- }
|
|
|
-
|
|
|
- public void testAllEnabledACLForJobKill()
|
|
|
- throws IOException, InterruptedException {
|
|
|
- UserGroupInformation ugi = createNecessaryUsers();
|
|
|
- // create other user who will try to kill the job of ugi.
|
|
|
- final UserGroupInformation otherUGI = UserGroupInformation.
|
|
|
- createUserForTesting("user1", new String [] {"group1"});
|
|
|
-
|
|
|
- ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
-
|
|
|
- @Override
|
|
|
- public Object run() throws Exception {
|
|
|
- JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
- "default", adminAcl), "*");
|
|
|
- verifyJobKill(otherUGI, conf, true);
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- public void testAllDisabledACLForJobKill()
|
|
|
- throws IOException, InterruptedException {
|
|
|
- // Create a fake superuser for all processes to execute within
|
|
|
- final UserGroupInformation ugi = createNecessaryUsers();
|
|
|
-
|
|
|
- // create other users who will try to kill the job of ugi.
|
|
|
- final UserGroupInformation otherUGI1 = UserGroupInformation.
|
|
|
- createUserForTesting("user1", new String [] {"group1"});
|
|
|
- final UserGroupInformation otherUGI2 = UserGroupInformation.
|
|
|
- createUserForTesting("user2", new String [] {"group2"});
|
|
|
-
|
|
|
- ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
-
|
|
|
- @Override
|
|
|
- public Object run() throws Exception {
|
|
|
- // No queue admins
|
|
|
- JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
- "default", adminAcl), " ");
|
|
|
- // Run job as ugi and try to kill job as user1, who (obviously)
|
|
|
- // should not be able to kill the job.
|
|
|
- verifyJobKill(otherUGI1, conf, false);
|
|
|
- verifyJobKill(otherUGI2, conf, false);
|
|
|
-
|
|
|
- // Check if cluster administrator can kill job
|
|
|
- conf.set(JobConf.MR_ADMINS, "user1 group2");
|
|
|
- verifyJobKill(otherUGI1, conf, true);
|
|
|
- verifyJobKill(otherUGI2, conf, true);
|
|
|
-
|
|
|
- // Check if MROwner(user who started the mapreduce cluster) can kill job
|
|
|
- verifyJobKill(ugi, conf, true);
|
|
|
-
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- public void testOwnerAllowedForJobKill()
|
|
|
- throws IOException, InterruptedException {
|
|
|
- final UserGroupInformation ugi = createNecessaryUsers();
|
|
|
-
|
|
|
- ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
-
|
|
|
- @Override
|
|
|
- public Object run() throws Exception {
|
|
|
-
|
|
|
- JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
- "default", adminAcl), "junk-user");
|
|
|
- verifyJobKill(ugi, conf, true);
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- public void testUserDisabledACLForJobKill()
|
|
|
- throws IOException, InterruptedException {
|
|
|
- UserGroupInformation ugi = createNecessaryUsers();
|
|
|
- // create other user who will try to kill the job of ugi.
|
|
|
- final UserGroupInformation otherUGI = UserGroupInformation.
|
|
|
- createUserForTesting("user1", new String [] {"group1"});
|
|
|
-
|
|
|
- ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
-
|
|
|
- @Override
|
|
|
- public Object run() throws Exception {
|
|
|
- //setup a cluster allowing a user to submit
|
|
|
- JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
- "default", adminAcl), "dummy-user");
|
|
|
- // Run job as ugi and try to kill job as user1, who (obviously)
|
|
|
- // should not able to kill the job.
|
|
|
- verifyJobKill(otherUGI, conf, false);
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- public void testUserEnabledACLForJobKill()
|
|
|
- throws IOException, LoginException, InterruptedException {
|
|
|
- UserGroupInformation ugi = createNecessaryUsers();
|
|
|
- // create other user who will try to kill the job of ugi.
|
|
|
- final UserGroupInformation otherUGI = UserGroupInformation.
|
|
|
- createUserForTesting("user1", new String [] {"group1"});
|
|
|
-
|
|
|
- ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
- @Override
|
|
|
- public Object run() throws Exception {
|
|
|
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
- String userName = ugi.getShortUserName();
|
|
|
- JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
- "default", adminAcl), "user1");
|
|
|
- // user1 should be able to kill the job
|
|
|
- verifyJobKill(otherUGI, conf, true);
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- public void testUserDisabledForJobPriorityChange()
|
|
|
- throws IOException, InterruptedException {
|
|
|
- UserGroupInformation ugi = createNecessaryUsers();
|
|
|
- // create other user who will try to change priority of the job of ugi.
|
|
|
- final UserGroupInformation otherUGI = UserGroupInformation.
|
|
|
- createUserForTesting("user1", new String [] {"group1"});
|
|
|
-
|
|
|
- ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
-
|
|
|
- @Override
|
|
|
- public Object run() throws Exception {
|
|
|
-
|
|
|
- JobConf conf = setupConf(QueueManager.toFullPropertyName(
|
|
|
- "default", adminAcl), "junk-user");
|
|
|
- verifyJobPriorityChangeAsOtherUser(otherUGI, conf, false);
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Test to verify refreshing of queue properties by using MRAdmin tool.
|
|
|
- *
|
|
|
- * @throws Exception
|
|
|
- */
|
|
|
- public void testACLRefresh() throws Exception {
|
|
|
- String queueConfigPath =
|
|
|
- System.getProperty("test.build.extraconf", "build/test/extraconf");
|
|
|
- File queueConfigFile =
|
|
|
- new File(queueConfigPath, QueueManager.QUEUE_ACLS_FILE_NAME);
|
|
|
- File hadoopConfigFile = new File(queueConfigPath, "mapred-site.xml");
|
|
|
try {
|
|
|
- //Setting up default mapred-site.xml
|
|
|
- Properties hadoopConfProps = new Properties();
|
|
|
- //these properties should be retained.
|
|
|
- hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
|
|
|
- hadoopConfProps.put(JobConf.MR_ACLS_ENABLED, "true");
|
|
|
- //These property should always be overridden
|
|
|
- hadoopConfProps.put(QueueManager.toFullPropertyName(
|
|
|
- "default", submitAcl), "u1");
|
|
|
- hadoopConfProps.put(QueueManager.toFullPropertyName(
|
|
|
- "q1", submitAcl), "u2");
|
|
|
- hadoopConfProps.put(QueueManager.toFullPropertyName(
|
|
|
- "q2", submitAcl), "u1");
|
|
|
- UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
|
|
|
-
|
|
|
- //Actual property which would be used.
|
|
|
- Properties queueConfProps = new Properties();
|
|
|
- queueConfProps.put(QueueManager.toFullPropertyName(
|
|
|
- "default", submitAcl), " ");
|
|
|
- //Writing out the queue configuration file.
|
|
|
- UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
|
|
|
-
|
|
|
- //Create a new configuration to be used with QueueManager
|
|
|
- JobConf conf = new JobConf();
|
|
|
- QueueManager queueManager = new QueueManager(conf);
|
|
|
- UserGroupInformation ugi = UserGroupInformation.
|
|
|
- createUserForTesting("user1", new String [] {"group1"});
|
|
|
-
|
|
|
- //Job Submission should fail because ugi to be used is set to blank.
|
|
|
- assertFalse("User Job Submission Succeeded before refresh.",
|
|
|
- queueManager.hasAccess("default", QueueACL.SUBMIT_JOB, ugi));
|
|
|
- assertFalse("User Job Submission Succeeded before refresh.",
|
|
|
- queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
|
|
|
- assertFalse("User Job Submission Succeeded before refresh.",
|
|
|
- queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, ugi));
|
|
|
-
|
|
|
- //Test job submission as alternate user.
|
|
|
- UserGroupInformation alternateUgi =
|
|
|
- UserGroupInformation.createUserForTesting("u1", new String[]{"user"});
|
|
|
- assertTrue("Alternate User Job Submission failed before refresh.",
|
|
|
- queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, alternateUgi));
|
|
|
-
|
|
|
- //Set acl for user1.
|
|
|
- queueConfProps.put(QueueManager.toFullPropertyName(
|
|
|
- "default", submitAcl), ugi.getShortUserName());
|
|
|
- queueConfProps.put(QueueManager.toFullPropertyName(
|
|
|
- "q1", submitAcl), ugi.getShortUserName());
|
|
|
- queueConfProps.put(QueueManager.toFullPropertyName(
|
|
|
- "q2", submitAcl), ugi.getShortUserName());
|
|
|
- //write out queue-acls.xml.
|
|
|
- UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
|
|
|
- //refresh configuration
|
|
|
- queueManager.refreshAcls(conf);
|
|
|
- //Submission should succeed
|
|
|
- assertTrue("User Job Submission failed after refresh.",
|
|
|
- queueManager.hasAccess("default", QueueACL.SUBMIT_JOB, ugi));
|
|
|
- assertTrue("User Job Submission failed after refresh.",
|
|
|
- queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
|
|
|
- assertTrue("User Job Submission failed after refresh.",
|
|
|
- queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, ugi));
|
|
|
- assertFalse("Alternate User Job Submission succeeded after refresh.",
|
|
|
- queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, alternateUgi));
|
|
|
- //delete the ACL file.
|
|
|
- queueConfigFile.delete();
|
|
|
-
|
|
|
- //rewrite the mapred-site.xml
|
|
|
- hadoopConfProps.put(JobConf.MR_ACLS_ENABLED, "true");
|
|
|
- hadoopConfProps.put(QueueManager.toFullPropertyName(
|
|
|
- "q1", submitAcl), ugi.getShortUserName());
|
|
|
- UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
|
|
|
- queueManager.refreshAcls(conf);
|
|
|
- assertTrue("User Job Submission allowed after refresh and no queue acls file.",
|
|
|
- queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
|
|
|
- } finally{
|
|
|
- if(queueConfigFile.exists()) {
|
|
|
- queueConfigFile.delete();
|
|
|
- }
|
|
|
- if(hadoopConfigFile.exists()) {
|
|
|
- hadoopConfigFile.delete();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void testQueueAclRefreshWithInvalidConfFile() throws IOException {
|
|
|
- String queueConfigPath =
|
|
|
- System.getProperty("test.build.extraconf", "build/test/extraconf");
|
|
|
- File queueConfigFile =
|
|
|
- new File(queueConfigPath, QueueManager.QUEUE_ACLS_FILE_NAME);
|
|
|
- File hadoopConfigFile = new File(queueConfigPath, "hadoop-site.xml");
|
|
|
- try {
|
|
|
- // queue properties with which the cluster is started.
|
|
|
- Properties hadoopConfProps = new Properties();
|
|
|
- hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
|
|
|
- hadoopConfProps.put(JobConf.MR_ACLS_ENABLED, "true");
|
|
|
- UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
|
|
|
-
|
|
|
- //properties for mapred-queue-acls.xml
|
|
|
- Properties queueConfProps = new Properties();
|
|
|
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
- queueConfProps.put(QueueManager.toFullPropertyName(
|
|
|
- "default", submitAcl), ugi.getShortUserName());
|
|
|
- queueConfProps.put(QueueManager.toFullPropertyName(
|
|
|
- "q1", submitAcl), ugi.getShortUserName());
|
|
|
- queueConfProps.put(QueueManager.toFullPropertyName(
|
|
|
- "q2", submitAcl), ugi.getShortUserName());
|
|
|
- UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
|
|
|
-
|
|
|
- Configuration conf = new JobConf();
|
|
|
- QueueManager queueManager = new QueueManager(conf);
|
|
|
- //Testing access to queue.
|
|
|
- assertTrue("User Job Submission failed.",
|
|
|
- queueManager.hasAccess("default", QueueACL.SUBMIT_JOB, ugi));
|
|
|
- assertTrue("User Job Submission failed.",
|
|
|
- queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
|
|
|
- assertTrue("User Job Submission failed.",
|
|
|
- queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, ugi));
|
|
|
-
|
|
|
- //Write out a new incomplete invalid configuration file.
|
|
|
- PrintWriter writer = new PrintWriter(new FileOutputStream(queueConfigFile));
|
|
|
- writer.println("<configuration>");
|
|
|
- writer.println("<property>");
|
|
|
- writer.flush();
|
|
|
- writer.close();
|
|
|
- try {
|
|
|
- //Exception to be thrown by queue manager because configuration passed
|
|
|
- //is invalid.
|
|
|
- queueManager.refreshAcls(conf);
|
|
|
- fail("Refresh of ACLs should have failed with invalid conf file.");
|
|
|
- } catch (Exception e) {
|
|
|
- }
|
|
|
- assertTrue("User Job Submission failed after invalid conf file refresh.",
|
|
|
- queueManager.hasAccess("default", QueueACL.SUBMIT_JOB, ugi));
|
|
|
- assertTrue("User Job Submission failed after invalid conf file refresh.",
|
|
|
- queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
|
|
|
- assertTrue("User Job Submission failed after invalid conf file refresh.",
|
|
|
- queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, ugi));
|
|
|
+ String userName = "user1";
|
|
|
+ JobConf conf
|
|
|
+ = setupConf(QueueManager.toFullPropertyName
|
|
|
+ ("default", submitAcl), "3698-junk-user," + userName
|
|
|
+ + " 3698-junk-group1,3698-junk-group2");
|
|
|
+ verifyJobSubmissionToDefaultQueue(conf, true, userName+",group1");
|
|
|
} finally {
|
|
|
- //Cleanup the configuration files in all cases
|
|
|
- if(hadoopConfigFile.exists()) {
|
|
|
- hadoopConfigFile.delete();
|
|
|
- }
|
|
|
- if(queueConfigFile.exists()) {
|
|
|
- queueConfigFile.delete();
|
|
|
- }
|
|
|
+ tearDownCluster();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
- private JobConf setupConf(String aclName, String aclValue) {
|
|
|
+ JobConf setupConf(String aclName, String aclValue) {
|
|
|
JobConf conf = new JobConf();
|
|
|
conf.setBoolean(JobConf.MR_ACLS_ENABLED, true);
|
|
|
conf.set(aclName, aclValue);
|
|
|
return conf;
|
|
|
}
|
|
|
|
|
|
- private void verifyQueues(Set<String> expectedQueues,
|
|
|
+ void verifyQueues(Set<String> expectedQueues,
|
|
|
Set<String> actualQueues) {
|
|
|
assertEquals(expectedQueues.size(), actualQueues.size());
|
|
|
for (String queue : expectedQueues) {
|
|
@@ -525,7 +203,7 @@ public class TestQueueManager extends TestCase {
|
|
|
/**
|
|
|
* Verify job submission as given user to the default queue
|
|
|
*/
|
|
|
- private void verifyJobSubmissionToDefaultQueue(JobConf conf, boolean shouldSucceed,
|
|
|
+ void verifyJobSubmissionToDefaultQueue(JobConf conf, boolean shouldSucceed,
|
|
|
String userInfo) throws IOException, InterruptedException {
|
|
|
verifyJobSubmission(conf, shouldSucceed, userInfo, "default");
|
|
|
}
|
|
@@ -533,20 +211,20 @@ public class TestQueueManager extends TestCase {
|
|
|
/**
|
|
|
* Verify job submission as given user to the given queue
|
|
|
*/
|
|
|
- private void verifyJobSubmission(JobConf conf, boolean shouldSucceed,
|
|
|
+ void verifyJobSubmission(JobConf conf, boolean shouldSucceed,
|
|
|
String userInfo, String queue) throws IOException, InterruptedException {
|
|
|
setUpCluster(conf);
|
|
|
try {
|
|
|
runAndVerifySubmission(conf, shouldSucceed, queue, userInfo);
|
|
|
} finally {
|
|
|
- tearDownCluster();
|
|
|
+ // tearDownCluster();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Verify if submission of job to the given queue will succeed or not
|
|
|
*/
|
|
|
- private void runAndVerifySubmission(JobConf conf, boolean shouldSucceed,
|
|
|
+ void runAndVerifySubmission(JobConf conf, boolean shouldSucceed,
|
|
|
String queue, String userInfo)
|
|
|
throws IOException, InterruptedException {
|
|
|
try {
|
|
@@ -576,9 +254,9 @@ public class TestQueueManager extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
|
- tearDownCluster();
|
|
|
+ // tearDownCluster();
|
|
|
}
|
|
|
-}
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Submit job as current user and kill the job as user of ugi.
|
|
@@ -588,7 +266,7 @@ public class TestQueueManager extends TestCase {
|
|
|
* @throws IOException
|
|
|
* @throws InterruptedException
|
|
|
*/
|
|
|
- private void verifyJobKill(UserGroupInformation ugi, JobConf conf,
|
|
|
+ void verifyJobKill(UserGroupInformation ugi, JobConf conf,
|
|
|
boolean shouldSucceed) throws IOException, InterruptedException {
|
|
|
setUpCluster(conf);
|
|
|
try {
|
|
@@ -636,12 +314,12 @@ public class TestQueueManager extends TestCase {
|
|
|
contains(" cannot perform operation KILL_JOB on "));
|
|
|
}
|
|
|
} finally {
|
|
|
- tearDownCluster();
|
|
|
+ // tearDownCluster();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
- private void verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed,
|
|
|
+ void verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed,
|
|
|
String otherUserInfo)
|
|
|
throws IOException, InterruptedException {
|
|
|
setUpCluster(conf);
|
|
@@ -679,7 +357,7 @@ public class TestQueueManager extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
|
- tearDownCluster();
|
|
|
+ // tearDownCluster();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -692,7 +370,7 @@ public class TestQueueManager extends TestCase {
|
|
|
* @throws IOException
|
|
|
* @throws InterruptedException
|
|
|
*/
|
|
|
- private void verifyJobPriorityChangeAsOtherUser(UserGroupInformation otherUGI,
|
|
|
+ void verifyJobPriorityChangeAsOtherUser(UserGroupInformation otherUGI,
|
|
|
JobConf conf, final boolean shouldSucceed)
|
|
|
throws IOException, InterruptedException {
|
|
|
setUpCluster(conf);
|
|
@@ -737,28 +415,42 @@ public class TestQueueManager extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
|
- tearDownCluster();
|
|
|
+ // tearDownCluster();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void setUpCluster(JobConf conf) throws IOException {
|
|
|
- miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
|
|
|
- FileSystem fileSys = miniDFSCluster.getFileSystem();
|
|
|
- TestMiniMRWithDFSWithDistinctUsers.mkdir(fileSys, "/user");
|
|
|
- TestMiniMRWithDFSWithDistinctUsers.mkdir(fileSys,
|
|
|
- conf.get("mapreduce.jobtracker.staging.root.dir",
|
|
|
- "/tmp/hadoop/mapred/staging"));
|
|
|
- String namenode = fileSys.getUri().toString();
|
|
|
- miniMRCluster = new MiniMRCluster(1, namenode, 3,
|
|
|
- null, null, conf);
|
|
|
+ void setUpCluster(JobConf conf) throws IOException {
|
|
|
+ if(miniMRCluster == null) {
|
|
|
+ miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
|
|
|
+ FileSystem fileSys = miniDFSCluster.getFileSystem();
|
|
|
+ TestMiniMRWithDFSWithDistinctUsers.mkdir(fileSys, "/user");
|
|
|
+ TestMiniMRWithDFSWithDistinctUsers.mkdir
|
|
|
+ (fileSys, conf.get("mapreduce.jobtracker.staging.root.dir",
|
|
|
+ "/tmp/hadoop/mapred/staging"));
|
|
|
+ String namenode = fileSys.getUri().toString();
|
|
|
+ miniMRCluster = new MiniMRCluster(1, namenode, 3,
|
|
|
+ null, null, conf);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- private void tearDownCluster() throws IOException {
|
|
|
- if (miniMRCluster != null) { miniMRCluster.shutdown(); }
|
|
|
- if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
|
|
|
+ void tearDownCluster() throws IOException {
|
|
|
+ if (miniMRCluster != null) {
|
|
|
+ long mrTeardownStart = new java.util.Date().getTime();
|
|
|
+ if (miniMRCluster != null) { miniMRCluster.shutdown(); }
|
|
|
+ long mrTeardownEnd = new java.util.Date().getTime();
|
|
|
+ if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
|
|
|
+ long dfsTeardownEnd = new java.util.Date().getTime();
|
|
|
+ miniMRCluster = null;
|
|
|
+ miniDFSCluster = null;
|
|
|
+ System.err.println("An MR teardown took "
|
|
|
+ + (mrTeardownEnd - mrTeardownStart)
|
|
|
+ + " milliseconds. A DFS teardown took "
|
|
|
+ + ( dfsTeardownEnd - mrTeardownEnd )
|
|
|
+ + " milliseconds.");
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- private RunningJob submitSleepJob(int numMappers, int numReducers,
|
|
|
+
|
|
|
+ RunningJob submitSleepJob(int numMappers, int numReducers,
|
|
|
long mapSleepTime, long reduceSleepTime,
|
|
|
boolean shouldComplete)
|
|
|
throws IOException, InterruptedException {
|
|
@@ -766,7 +458,7 @@ public class TestQueueManager extends TestCase {
|
|
|
reduceSleepTime, shouldComplete, null);
|
|
|
}
|
|
|
|
|
|
- private RunningJob submitSleepJob(int numMappers, int numReducers,
|
|
|
+ RunningJob submitSleepJob(int numMappers, int numReducers,
|
|
|
long mapSleepTime, long reduceSleepTime,
|
|
|
boolean shouldComplete, String userInfo)
|
|
|
throws IOException, InterruptedException {
|
|
@@ -774,7 +466,7 @@ public class TestQueueManager extends TestCase {
|
|
|
reduceSleepTime, shouldComplete, userInfo, null);
|
|
|
}
|
|
|
|
|
|
- private RunningJob submitSleepJob(final int numMappers, final int numReducers,
|
|
|
+ RunningJob submitSleepJob(final int numMappers, final int numReducers,
|
|
|
final long mapSleepTime,
|
|
|
final long reduceSleepTime, final boolean shouldComplete, String userInfo,
|
|
|
String queueName)
|