|
@@ -18,7 +18,11 @@
|
|
|
|
|
|
package org.apache.hadoop.mapred;
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
|
|
+import java.io.File;
|
|
|
|
+import java.io.FileOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
+import java.io.PrintWriter;
|
|
|
|
+import java.util.Properties;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.TreeSet;
|
|
import java.util.TreeSet;
|
|
|
|
|
|
@@ -28,13 +32,13 @@ import junit.framework.TestCase;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.examples.SleepJob;
|
|
import org.apache.hadoop.examples.SleepJob;
|
|
-import org.apache.hadoop.mapred.JobConf;
|
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
-import org.apache.hadoop.security.UserGroupInformation;
|
|
|
|
import org.apache.hadoop.security.UnixUserGroupInformation;
|
|
import org.apache.hadoop.security.UnixUserGroupInformation;
|
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
|
|
|
public class TestQueueManager extends TestCase {
|
|
public class TestQueueManager extends TestCase {
|
|
|
|
|
|
@@ -195,6 +199,169 @@ public class TestQueueManager extends TestCase {
|
|
verifyJobPriorityChangeAsOtherUser(conf, false,
|
|
verifyJobPriorityChangeAsOtherUser(conf, false,
|
|
"junk-user,junk-user-group");
|
|
"junk-user,junk-user-group");
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Test to verify refreshing of queue properties by using MRAdmin tool.
|
|
|
|
+ *
|
|
|
|
+ * @throws Exception
|
|
|
|
+ */
|
|
|
|
+ public void testACLRefresh() throws Exception {
|
|
|
|
+ String queueConfigPath =
|
|
|
|
+ System.getProperty("test.build.extraconf", "build/test/extraconf");
|
|
|
|
+ File queueConfigFile =
|
|
|
|
+ new File(queueConfigPath, QueueManager.QUEUE_ACLS_FILE_NAME);
|
|
|
|
+ File hadoopConfigFile = new File(queueConfigPath, "mapred-site.xml");
|
|
|
|
+ try {
|
|
|
|
+ //Setting up default mapred-site.xml
|
|
|
|
+ Properties hadoopConfProps = new Properties();
|
|
|
|
+ //these properties should be retained.
|
|
|
|
+ hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
|
|
|
|
+ hadoopConfProps.put("mapred.acls.enabled", "true");
|
|
|
|
+ //These property should always be overridden
|
|
|
|
+ hadoopConfProps.put("mapred.queue.default.acl-submit-job", "u1");
|
|
|
|
+ hadoopConfProps.put("mapred.queue.q1.acl-submit-job", "u2");
|
|
|
|
+ hadoopConfProps.put("mapred.queue.q2.acl-submit-job", "u1");
|
|
|
|
+ UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
|
|
|
|
+
|
|
|
|
+ //Actual property which would be used.
|
|
|
|
+ Properties queueConfProps = new Properties();
|
|
|
|
+ queueConfProps.put("mapred.queue.default.acl-submit-job", " ");
|
|
|
|
+ //Writing out the queue configuration file.
|
|
|
|
+ UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
|
|
|
|
+
|
|
|
|
+ //Create a new configuration to be used with QueueManager
|
|
|
|
+ JobConf conf = new JobConf();
|
|
|
|
+ QueueManager queueManager = new QueueManager(conf);
|
|
|
|
+ UserGroupInformation ugi = UnixUserGroupInformation.getCurrentUGI();
|
|
|
|
+ //Job Submission should fail because ugi to be used is set to blank.
|
|
|
|
+ assertFalse("User Job Submission Succeeded before refresh.",
|
|
|
|
+ queueManager.hasAccess("default", QueueManager.QueueOperation.
|
|
|
|
+ SUBMIT_JOB, ugi));
|
|
|
|
+ assertFalse("User Job Submission Succeeded before refresh.",
|
|
|
|
+ queueManager.hasAccess("q1", QueueManager.QueueOperation.
|
|
|
|
+ SUBMIT_JOB, ugi));
|
|
|
|
+ assertFalse("User Job Submission Succeeded before refresh.",
|
|
|
|
+ queueManager.hasAccess("q2", QueueManager.QueueOperation.
|
|
|
|
+ SUBMIT_JOB, ugi));
|
|
|
|
+
|
|
|
|
+ //Test job submission as alternate user.
|
|
|
|
+ Configuration alternateUserConfig = new Configuration();
|
|
|
|
+ alternateUserConfig.set("hadoop.job.ugi","u1,users");
|
|
|
|
+ UserGroupInformation alternateUgi =
|
|
|
|
+ UserGroupInformation.readFrom(alternateUserConfig);
|
|
|
|
+ assertTrue("Alternate User Job Submission failed before refresh.",
|
|
|
|
+ queueManager.hasAccess("q2", QueueManager.QueueOperation.
|
|
|
|
+ SUBMIT_JOB, alternateUgi));
|
|
|
|
+
|
|
|
|
+ //Set acl for the current user.
|
|
|
|
+ queueConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
|
|
|
|
+ queueConfProps.put("mapred.queue.q1.acl-submit-job", ugi.getUserName());
|
|
|
|
+ queueConfProps.put("mapred.queue.q2.acl-submit-job", ugi.getUserName());
|
|
|
|
+ //write out queue-acls.xml.
|
|
|
|
+ UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
|
|
|
|
+ //refresh configuration
|
|
|
|
+ queueManager.refreshAcls(conf);
|
|
|
|
+ //Submission should succeed
|
|
|
|
+ assertTrue("User Job Submission failed after refresh.",
|
|
|
|
+ queueManager.hasAccess("default", QueueManager.QueueOperation.
|
|
|
|
+ SUBMIT_JOB, ugi));
|
|
|
|
+ assertTrue("User Job Submission failed after refresh.",
|
|
|
|
+ queueManager.hasAccess("q1", QueueManager.QueueOperation.
|
|
|
|
+ SUBMIT_JOB, ugi));
|
|
|
|
+ assertTrue("User Job Submission failed after refresh.",
|
|
|
|
+ queueManager.hasAccess("q2", QueueManager.QueueOperation.
|
|
|
|
+ SUBMIT_JOB, ugi));
|
|
|
|
+ assertFalse("Alternate User Job Submission succeeded after refresh.",
|
|
|
|
+ queueManager.hasAccess("q2", QueueManager.QueueOperation.
|
|
|
|
+ SUBMIT_JOB, alternateUgi));
|
|
|
|
+ //delete the ACL file.
|
|
|
|
+ queueConfigFile.delete();
|
|
|
|
+
|
|
|
|
+ //rewrite the mapred-site.xml
|
|
|
|
+ hadoopConfProps.put("mapred.acls.enabled", "true");
|
|
|
|
+ hadoopConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
|
|
|
|
+ UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
|
|
|
|
+ queueManager.refreshAcls(conf);
|
|
|
|
+ assertTrue("User Job Submission failed after refresh and no queue acls file.",
|
|
|
|
+ queueManager.hasAccess("default", QueueManager.QueueOperation.
|
|
|
|
+ SUBMIT_JOB, ugi));
|
|
|
|
+ } finally{
|
|
|
|
+ if(queueConfigFile.exists()) {
|
|
|
|
+ queueConfigFile.delete();
|
|
|
|
+ }
|
|
|
|
+ if(hadoopConfigFile.exists()) {
|
|
|
|
+ hadoopConfigFile.delete();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void testQueueAclRefreshWithInvalidConfFile() throws IOException {
|
|
|
|
+ String queueConfigPath =
|
|
|
|
+ System.getProperty("test.build.extraconf", "build/test/extraconf");
|
|
|
|
+ File queueConfigFile =
|
|
|
|
+ new File(queueConfigPath, QueueManager.QUEUE_ACLS_FILE_NAME);
|
|
|
|
+ File hadoopConfigFile = new File(queueConfigPath, "hadoop-site.xml");
|
|
|
|
+ try {
|
|
|
|
+ // queue properties with which the cluster is started.
|
|
|
|
+ Properties hadoopConfProps = new Properties();
|
|
|
|
+ hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
|
|
|
|
+ hadoopConfProps.put("mapred.acls.enabled", "true");
|
|
|
|
+ UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
|
|
|
|
+
|
|
|
|
+ //properties for mapred-queue-acls.xml
|
|
|
|
+ Properties queueConfProps = new Properties();
|
|
|
|
+ UserGroupInformation ugi = UnixUserGroupInformation.getCurrentUGI();
|
|
|
|
+ queueConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
|
|
|
|
+ queueConfProps.put("mapred.queue.q1.acl-submit-job", ugi.getUserName());
|
|
|
|
+ queueConfProps.put("mapred.queue.q2.acl-submit-job", ugi.getUserName());
|
|
|
|
+ UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
|
|
|
|
+
|
|
|
|
+ Configuration conf = new JobConf();
|
|
|
|
+ QueueManager queueManager = new QueueManager(conf);
|
|
|
|
+ //Testing access to queue.
|
|
|
|
+ assertTrue("User Job Submission failed.",
|
|
|
|
+ queueManager.hasAccess("default", QueueManager.QueueOperation.
|
|
|
|
+ SUBMIT_JOB, ugi));
|
|
|
|
+ assertTrue("User Job Submission failed.",
|
|
|
|
+ queueManager.hasAccess("q1", QueueManager.QueueOperation.
|
|
|
|
+ SUBMIT_JOB, ugi));
|
|
|
|
+ assertTrue("User Job Submission failed.",
|
|
|
|
+ queueManager.hasAccess("q2", QueueManager.QueueOperation.
|
|
|
|
+ SUBMIT_JOB, ugi));
|
|
|
|
+
|
|
|
|
+ //Write out a new incomplete invalid configuration file.
|
|
|
|
+ PrintWriter writer = new PrintWriter(new FileOutputStream(queueConfigFile));
|
|
|
|
+ writer.println("<configuration>");
|
|
|
|
+ writer.println("<property>");
|
|
|
|
+ writer.flush();
|
|
|
|
+ writer.close();
|
|
|
|
+ try {
|
|
|
|
+ //Exception to be thrown by queue manager because configuration passed
|
|
|
|
+ //is invalid.
|
|
|
|
+ queueManager.refreshAcls(conf);
|
|
|
|
+ fail("Refresh of ACLs should have failed with invalid conf file.");
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ }
|
|
|
|
+ assertTrue("User Job Submission failed after invalid conf file refresh.",
|
|
|
|
+ queueManager.hasAccess("default", QueueManager.QueueOperation.
|
|
|
|
+ SUBMIT_JOB, ugi));
|
|
|
|
+ assertTrue("User Job Submission failed after invalid conf file refresh.",
|
|
|
|
+ queueManager.hasAccess("q1", QueueManager.QueueOperation.
|
|
|
|
+ SUBMIT_JOB, ugi));
|
|
|
|
+ assertTrue("User Job Submission failed after invalid conf file refresh.",
|
|
|
|
+ queueManager.hasAccess("q2", QueueManager.QueueOperation.
|
|
|
|
+ SUBMIT_JOB, ugi));
|
|
|
|
+ } finally {
|
|
|
|
+ //Cleanup the configuration files in all cases
|
|
|
|
+ if(hadoopConfigFile.exists()) {
|
|
|
|
+ hadoopConfigFile.delete();
|
|
|
|
+ }
|
|
|
|
+ if(queueConfigFile.exists()) {
|
|
|
|
+ queueConfigFile.delete();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
|
|
private JobConf setupConf(String aclName, String aclValue) {
|
|
private JobConf setupConf(String aclName, String aclValue) {
|
|
JobConf conf = new JobConf();
|
|
JobConf conf = new JobConf();
|
|
@@ -217,10 +384,20 @@ public class TestQueueManager extends TestCase {
|
|
}
|
|
}
|
|
|
|
|
|
private void verifyJobSubmission(JobConf conf, boolean shouldSucceed,
|
|
private void verifyJobSubmission(JobConf conf, boolean shouldSucceed,
|
|
- String queue) throws IOException {
|
|
|
|
|
|
+ String queue) throws IOException {
|
|
setUpCluster(conf);
|
|
setUpCluster(conf);
|
|
try {
|
|
try {
|
|
- RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, null, queue);
|
|
|
|
|
|
+ runAndVerifySubmission(conf, shouldSucceed, queue, null);
|
|
|
|
+ } finally {
|
|
|
|
+ tearDownCluster();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void runAndVerifySubmission(JobConf conf, boolean shouldSucceed,
|
|
|
|
+ String queue, String userInfo)
|
|
|
|
+ throws IOException {
|
|
|
|
+ try {
|
|
|
|
+ RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, userInfo, queue);
|
|
if (shouldSucceed) {
|
|
if (shouldSucceed) {
|
|
assertTrue(rjob.isSuccessful());
|
|
assertTrue(rjob.isSuccessful());
|
|
} else {
|
|
} else {
|
|
@@ -411,14 +588,14 @@ public class TestQueueManager extends TestCase {
|
|
if (queueName != null) {
|
|
if (queueName != null) {
|
|
clientConf.setQueueName(queueName);
|
|
clientConf.setQueueName(queueName);
|
|
}
|
|
}
|
|
|
|
+ JobConf jc = new JobConf(clientConf);
|
|
|
|
+ if (userInfo != null) {
|
|
|
|
+ jc.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
|
|
|
|
+ }
|
|
RunningJob rJob = null;
|
|
RunningJob rJob = null;
|
|
if (shouldComplete) {
|
|
if (shouldComplete) {
|
|
- rJob = JobClient.runJob(clientConf);
|
|
|
|
|
|
+ rJob = JobClient.runJob(jc);
|
|
} else {
|
|
} else {
|
|
- JobConf jc = new JobConf(clientConf);
|
|
|
|
- if (userInfo != null) {
|
|
|
|
- jc.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
|
|
|
|
- }
|
|
|
|
rJob = new JobClient(clientConf).submitJob(jc);
|
|
rJob = new JobClient(clientConf).submitJob(jc);
|
|
}
|
|
}
|
|
return rJob;
|
|
return rJob;
|