Explorar el Código

HADOOP-4124. Added a command-line switch to allow users to set job priorities, also allow it to be manipulated via the web-ui. Contributed by Hemanth Yamijala.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@697219 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy hace 17 años
padre
commit
ed17d5bf56

+ 4 - 0
CHANGES.txt

@@ -382,6 +382,10 @@ Trunk (unreleased changes)
     HADOOP-3975. Change test-patch script to report working the dir
     modifications preventing the suite from being run. (Ramya R via cdouglas)
 
+    HADOOP-4124. Added a command-line switch to allow users to set job
+    priorities, also allow it to be manipulated via the web-ui. (Hemanth
+    Yamijala via acmurthy) 
+
   OPTIMIZATIONS
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 

+ 6 - 6
src/mapred/org/apache/hadoop/mapred/JSPUtil.java

@@ -55,7 +55,8 @@ class JSPUtil {
       }
     }
 
-    if (request.getParameter("changeJobPriority") != null) {
+    if (conf.getBoolean(PRIVATE_ACTIONS_KEY, false) && 
+          request.getParameter("changeJobPriority") != null) {
       String[] jobs = request.getParameterValues("jobCheckBox");
 
       if (jobs != null) {
@@ -82,7 +83,9 @@ class JSPUtil {
   public static String generateJobTable(String label, Vector<JobInProgress> jobs
       , int refresh, int rowId) throws IOException {
 
-    boolean isModifiable = label.equals("Running");
+    boolean isModifiable = label.equals("Running") 
+                                && conf.getBoolean(
+                                      PRIVATE_ACTIONS_KEY, false);
     StringBuffer sb = new StringBuffer();
     
     sb.append("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n");
@@ -94,10 +97,7 @@ class JSPUtil {
         sb.append("<td><input type=\"Button\" onclick=\"selectAll()\" " +
         		"value=\"Select All\" id=\"checkEm\"></td>");
         sb.append("<td>");
-        if (conf.getBoolean(PRIVATE_ACTIONS_KEY, false)) {
-         sb.append("<input type=\"submit\" name=\"killJobs\" " +
-         		"value=\"Kill Selected Jobs\">");
-        }
+        sb.append("<input type=\"submit\" name=\"killJobs\" value=\"Kill Selected Jobs\">");
         sb.append("</td");
         sb.append("<td><nobr>");
         sb.append("<select name=\"setJobPriority\">");

+ 54 - 4
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -297,6 +297,15 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     public synchronized void killJob() throws IOException {
       jobSubmitClient.killJob(getID());
     }
+   
+    
+    /** Set the priority of the job.
+    * @param priority new priority of the job. 
+    */
+    public synchronized void setJobPriority(String priority) 
+                                                throws IOException {
+      jobSubmitClient.setJobPriority(getID(), priority);
+    }
     
     /**
      * Kill indicated task attempt.
@@ -1297,11 +1306,20 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     return this.taskOutputFilter; 
   }
 
+  private String getJobPriorityNames() {
+    StringBuffer sb = new StringBuffer();
+    for (JobPriority p : JobPriority.values()) {
+      sb.append(p.name()).append(" ");
+    }
+    return sb.substring(0, sb.length()-1);
+  }
+  
   /**
    * Display usage of the command-line tool and terminate execution
    */
   private void displayUsage(String cmd) {
     String prefix = "Usage: JobClient ";
+    String jobPriorityValues = getJobPriorityNames();
     if("-submit".equals(cmd)) {
       System.err.println(prefix + "[" + cmd + " <job-file>]");
     } else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
@@ -1316,12 +1334,19 @@ public class JobClient extends Configured implements MRConstants, Tool  {
       System.err.println(prefix + "[" + cmd + " [all]]");
     } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
       System.err.println(prefix + "[" + cmd + " <task-id>]");
+    } else if ("-set-priority".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
+          "Valid values for priorities are: " 
+          + jobPriorityValues); 
     } else {
       System.err.printf(prefix + "<command> <args>\n");
       System.err.printf("\t[-submit <job-file>]\n");
       System.err.printf("\t[-status <job-id>]\n");
       System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]\n");
       System.err.printf("\t[-kill <job-id>]\n");
+      System.err.printf("\t[-set-priority <job-id> <priority>]. " +
+                                      "Valid values for priorities are: " +
+                                      jobPriorityValues + "\n");
       System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]\n");
       System.err.printf("\t[-history <jobOutputDir>]\n");
       System.err.printf("\t[-list [all]]\n");
@@ -1345,6 +1370,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     String outputDir = null;
     String counterGroupName = null;
     String counterName = null;
+    String newPriority = null;
     int fromEvent = 0;
     int nEvents = 0;
     boolean getStatus = false;
@@ -1357,6 +1383,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     boolean listAllJobs = false;
     boolean killTask = false;
     boolean failTask = false;
+    boolean setJobPriority = false;
 
     if ("-submit".equals(cmd)) {
       if (argv.length != 2) {
@@ -1387,6 +1414,20 @@ public class JobClient extends Configured implements MRConstants, Tool  {
       }
       jobid = argv[1];
       killJob = true;
+    } else if ("-set-priority".equals(cmd)) {
+      if (argv.length != 3) {
+        displayUsage(cmd);
+        return exitCode;
+      }
+      jobid = argv[1];
+      newPriority = argv[2];
+      try {
+        JobPriority jp = JobPriority.valueOf(newPriority); 
+      } catch (IllegalArgumentException iae) {
+        displayUsage(cmd);
+        return exitCode;
+      }
+      setJobPriority = true; 
     } else if ("-events".equals(cmd)) {
       if (argv.length != 4) {
         displayUsage(cmd);
@@ -1482,6 +1523,15 @@ public class JobClient extends Configured implements MRConstants, Tool  {
           System.out.println("Killed job " + jobid);
           exitCode = 0;
         }
+      } else if (setJobPriority) {
+        RunningJob job = getJob(JobID.forName(jobid));
+        if (job == null) {
+          System.out.println("Could not find job " + jobid);
+        } else {
+          job.setJobPriority(newPriority);
+          System.out.println("Changed job priority.");
+          exitCode = 0;
+        } 
       } else if (viewHistory) {
         viewHistory(outputDir, viewAllHistory);
         exitCode = 0;
@@ -1553,7 +1603,6 @@ public class JobClient extends Configured implements MRConstants, Tool  {
       jobs = new JobStatus[0];
 
     System.out.printf("%d jobs currently running\n", jobs.length);
-    System.out.printf("JobId\tState\tStartTime\tUserName\n");
     displayJobList(jobs);
   }
     
@@ -1572,10 +1621,11 @@ public class JobClient extends Configured implements MRConstants, Tool  {
   }
 
   void displayJobList(JobStatus[] jobs) {
-    System.out.printf("JobId\tState\tStartTime\tUserName\tSchedulingInfo\n");
+    System.out.printf("JobId\tState\tStartTime\tUserName\tPriority\tSchedulingInfo\n");
     for (JobStatus job : jobs) {
-      System.out.printf("%s\t%d\t%d\t%s\t%s\n", job.getJobID(), job.getRunState(),
-          job.getStartTime(), job.getUsername(),job.getSchedulingInfo());
+      System.out.printf("%s\t%d\t%d\t%s\t%s\t%s\n", job.getJobID(), job.getRunState(),
+          job.getStartTime(), job.getUsername(), 
+          job.getJobPriority().name(), job.getSchedulingInfo());
     }
   }
 

+ 8 - 2
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -203,6 +203,7 @@ class JobInProgress {
     fs.copyToLocalFile(jobFile, localJobFile);
     conf = new JobConf(localJobFile);
     this.priority = conf.getJobPriority();
+    this.status.setJobPriority(this.priority);
     this.profile = new JobProfile(conf.getUser(), jobid, 
                                   jobFile.toString(), url, conf.getJobName(),
                                   conf.getQueueName());
@@ -416,7 +417,8 @@ class JobInProgress {
                        numReduceTasks, jobtracker, conf, this);
     cleanup[1].setCleanupTask();
 
-    this.status = new JobStatus(status.getJobID(), 0.0f, 0.0f, JobStatus.RUNNING);
+    this.status = new JobStatus(status.getJobID(), 0.0f, 0.0f, 0.0f, 
+                                          JobStatus.RUNNING, status.getJobPriority());
     tasksInited.set(true);
         
     JobHistory.JobInfo.logStarted(profile.getJobID(), this.launchTime, 
@@ -476,6 +478,9 @@ class JobInProgress {
     } else {
       this.priority = priority;
     }
+    synchronized (this) {
+      status.setJobPriority(priority);
+    }
     // log and change to the job's priority
     JobHistory.JobInfo.logJobPriority(jobId, priority);
   }
@@ -1746,7 +1751,8 @@ class JobInProgress {
     if ((status.getRunState() == JobStatus.RUNNING) ||
         (status.getRunState() == JobStatus.PREP)) {
       this.status = new JobStatus(status.getJobID(),
-                          1.0f, 1.0f, 1.0f, JobStatus.FAILED);
+                          1.0f, 1.0f, 1.0f, JobStatus.FAILED,
+                          status.getJobPriority());
       this.finishTime = System.currentTimeMillis();
       JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime, 
               this.finishedMapTasks, this.finishedReduceTasks);

+ 44 - 6
src/mapred/org/apache/hadoop/mapred/JobStatus.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.io.WritableUtils;
 
 /**************************************************
  * Describes the current status of a job.  This is
@@ -53,6 +54,7 @@ public class JobStatus implements Writable {
   private int runState;
   private long startTime;
   private String user;
+  private JobPriority priority;
   private String schedulingInfo="";
     
   /**
@@ -70,12 +72,8 @@ public class JobStatus implements Writable {
    */
   public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
                    float cleanupProgress, int runState) {
-    this.jobid = jobid;
-    this.mapProgress = mapProgress;
-    this.reduceProgress = reduceProgress;
-    this.cleanupProgress = cleanupProgress;
-    this.runState = runState;
-    this.user = "nobody";
+    this(jobid, mapProgress, reduceProgress, cleanupProgress, runState,
+                  JobPriority.NORMAL);
   }
 
   /**
@@ -90,6 +88,27 @@ public class JobStatus implements Writable {
     this(jobid, mapProgress, reduceProgress, 0.0f, runState);
   }
 
+  /**
+   * Create a job status object for a given jobid.
+   * @param jobid The jobid of the job
+   * @param mapProgress The progress made on the maps
+   * @param reduceProgress The progress made on the reduces
+   * @param runState The current state of the job
+   * @param jp Priority of the job.
+   */
+   public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
+                      float cleanupProgress, int runState, JobPriority jp) {
+     this.jobid = jobid;
+     this.mapProgress = mapProgress;
+     this.reduceProgress = reduceProgress;
+     this.cleanupProgress = cleanupProgress;
+     this.runState = runState;
+     this.user = "nobody";
+     if (jp == null) {
+       throw new IllegalArgumentException("Job Priority cannot be null.");
+     }
+     priority = jp;
+   }
   /**
    * @deprecated use getJobID instead
    */
@@ -190,6 +209,23 @@ public class JobStatus implements Writable {
     this.schedulingInfo = schedulingInfo;
   }
   
+  /**
+   * Return the priority of the job
+   * @return job priority
+   */
+   public synchronized JobPriority getJobPriority() { return priority; }
+  
+  /**
+   * Set the priority of the job, defaulting to NORMAL.
+   * @param jp new job priority
+   */
+   public synchronized void setJobPriority(JobPriority jp) {
+     if (jp == null) {
+       throw new IllegalArgumentException("Job priority cannot be null.");
+     }
+     priority = jp;
+   }
+  
   ///////////////////////////////////////
   // Writable
   ///////////////////////////////////////
@@ -201,6 +237,7 @@ public class JobStatus implements Writable {
     out.writeInt(runState);
     out.writeLong(startTime);
     Text.writeString(out, user);
+    WritableUtils.writeEnum(out, priority);
     Text.writeString(out, schedulingInfo);
   }
 
@@ -212,6 +249,7 @@ public class JobStatus implements Writable {
     this.runState = in.readInt();
     this.startTime = in.readLong();
     this.user = Text.readString(in);
+    this.priority = WritableUtils.readEnum(in, JobPriority.class);
     this.schedulingInfo = Text.readString(in);
   }
 }

+ 9 - 1
src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java

@@ -45,8 +45,9 @@ interface JobSubmissionProtocol extends VersionedProtocol {
    *             cleanupProgress to JobStatus as part of HADOOP-3150
    * Version 13: Added getJobQueueInfos and getJobQueueInfo(queue name)
    *             and getAllJobs(queue) as a part of HADOOP-3930
+   * Version 14: Added setPriority for HADOOP-4124            
    */
-  public static final long versionID = 13L;
+  public static final long versionID = 14L;
 
   /**
    * Allocate a name for the job.
@@ -73,6 +74,13 @@ interface JobSubmissionProtocol extends VersionedProtocol {
    */
   public void killJob(JobID jobid) throws IOException;
 
+  /**
+   * Set the priority of the specified job
+   * @param jobid ID of the job
+   * @param priority Priority to be set for the job
+   */
+  public void setJobPriority(JobID jobid, String priority) 
+                                                      throws IOException;
   /**
    * Kill indicated task attempt.
    * @param taskId the id of the task to kill.

+ 14 - 0
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -2231,6 +2231,20 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     }
   }
 
+  /**
+   * Set the priority of a job
+   * @param jobid id of the job
+   * @param priority new priority of the job
+   */
+  public synchronized void setJobPriority(JobID jobid, 
+                                              String priority)
+                                                throws IOException {
+    JobInProgress job = jobs.get(jobid);
+    checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
+    JobPriority newPriority = JobPriority.valueOf(priority);
+    setJobPriority(jobid, newPriority);
+  }
+                           
   public synchronized JobProfile getJobProfile(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
     if (job != null) {

+ 5 - 0
src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -310,6 +310,11 @@ class LocalJobRunner implements JobSubmissionProtocol {
     jobs.get(id).stop();
   }
 
+  public void setJobPriority(JobID id, String jp) throws IOException {
+    throw new UnsupportedOperationException("Changing job priority " +
+                      "in LocalJobRunner is not supported.");
+  }
+  
   /** Throws {@link UnsupportedOperationException} */
   public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException {
     throw new UnsupportedOperationException("Killing tasks in " +

+ 8 - 1
src/mapred/org/apache/hadoop/mapred/RunningJob.java

@@ -124,7 +124,14 @@ public interface RunningJob {
    * @throws IOException
    */
   public void killJob() throws IOException;
-    
+  
+  /**
+   * Set the priority of a running job.
+   * @param priority the new priority for the job.
+   * @throws IOException
+   */
+  public void setJobPriority(String priority) throws IOException;
+  
   /**
    * Get events indicating completion (success/failure) of component tasks.
    *  

+ 46 - 1
src/test/org/apache/hadoop/mapred/TestJobClient.java

@@ -17,12 +17,19 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
 import java.io.PrintStream;
 import java.io.Writer;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
@@ -31,6 +38,9 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
 public class TestJobClient extends ClusterMapReduceTestCase {
+  
+  private static final Log LOG = LogFactory.getLog(TestJobClient.class);
+  
   private String runJob() throws Exception {
     OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
     Writer wr = new OutputStreamWriter(os);
@@ -41,7 +51,8 @@ public class TestJobClient extends ClusterMapReduceTestCase {
 
     JobConf conf = createJobConf();
     conf.setJobName("mr");
-
+    conf.setJobPriority(JobPriority.HIGH);
+    
     conf.setInputFormat(TextInputFormat.class);
 
     conf.setMapOutputKeyClass(LongWritable.class);
@@ -82,4 +93,38 @@ public class TestJobClient extends ClusterMapReduceTestCase {
     assertEquals("Counter", "3", out.toString().trim());
   }
 
+  public void testJobList() throws Exception {
+    String jobId = runJob();
+    verifyJobPriority(jobId, "HIGH");
+  }
+
+  private void verifyJobPriority(String jobId, String priority)
+                            throws Exception {
+    PipedInputStream pis = new PipedInputStream();
+    PipedOutputStream pos = new PipedOutputStream(pis);
+    int exitCode = runTool(createJobConf(), new JobClient(),
+        new String[] { "-list", "all" },
+        pos);
+    assertEquals("Exit code", 0, exitCode);
+    BufferedReader br = new BufferedReader(new InputStreamReader(pis));
+    String line = null;
+    while ((line=br.readLine()) != null) {
+      LOG.info("line = " + line);
+      if (!line.startsWith(jobId)) {
+        continue;
+      }
+      assertTrue(line.contains(priority));
+      break;
+    }
+    pis.close();
+  }
+  
+  public void testChangingJobPriority() throws Exception {
+    String jobId = runJob();
+    int exitCode = runTool(createJobConf(), new JobClient(),
+        new String[] { "-set-priority", jobId, "VERY_LOW" },
+        new ByteArrayOutputStream());
+    assertEquals("Exit code", 0, exitCode);
+    verifyJobPriority(jobId, "VERY_LOW");
+  }
 }

+ 44 - 0
src/test/org/apache/hadoop/mapred/TestQueueManager.java

@@ -172,6 +172,13 @@ public class TestQueueManager extends TestCase {
     verifyJobKillAsOtherUser(conf, true, "dummy-user,dummy-user-group");
   }
   
+  public void testUserDisabledForJobPriorityChange() throws IOException {
+    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
+                              "junk-user");
+    verifyJobPriorityChangeAsOtherUser(conf, false, 
+                              "junk-user,junk-user-group");
+  }
+  
   private JobConf setupConf(String aclName, String aclValue) {
     JobConf conf = new JobConf();
     conf.setBoolean("mapred.acls.enabled", true);
@@ -256,6 +263,7 @@ public class TestQueueManager extends TestCase {
     }
   }
 
+  
   private void verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed,
                                         String otherUserInfo) 
                         throws IOException {
@@ -295,6 +303,42 @@ public class TestQueueManager extends TestCase {
     }
   }
   
+  private void verifyJobPriorityChangeAsOtherUser(JobConf conf, 
+                          boolean shouldSucceed, String otherUserInfo)
+                            throws IOException {
+    setUpCluster(conf);
+    try {
+      // submit job as another user.
+      String userInfo = otherUserInfo;
+      RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
+      assertFalse(rjob.isComplete());
+      
+      // try to change priority as self
+      try {
+        rjob.setJobPriority("VERY_LOW");
+        if (!shouldSucceed) {
+          fail("changing priority should fail.");
+        }
+      } catch (IOException ioe) {
+        //verify it fails
+        LOG.info("exception while submitting job: " + ioe.getMessage());
+        assertTrue(ioe.getMessage().
+                        contains("cannot perform operation " +
+                                    "ADMINISTER_JOBS on queue default"));
+      }
+      //wait for job to complete on its own
+      while (!rjob.isComplete()) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+          break;
+        }
+      }
+    } finally {
+      tearDownCluster();
+    }
+  }
+  
   private void setUpCluster(JobConf conf) throws IOException {
     miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
     FileSystem fileSys = miniDFSCluster.getFileSystem();

+ 19 - 15
src/webapps/job/jobdetails.jsp

@@ -159,7 +159,9 @@
     JobInProgress job = (JobInProgress) tracker.getJob(jobIdObj);
     
     String action = request.getParameter("action");
-    if("changeprio".equalsIgnoreCase(action) && request.getMethod().equalsIgnoreCase("POST")) {
+    if(JSPUtil.conf.getBoolean(PRIVATE_ACTIONS_KEY, false) && 
+        "changeprio".equalsIgnoreCase(action) 
+        && request.getMethod().equalsIgnoreCase("POST")) {
       tracker.setJobPriority(jobIdObj, 
                              JobPriority.valueOf(request.getParameter("prio")));
     }
@@ -335,21 +337,23 @@ if("off".equals(session.getAttribute("map.graph"))) { %>
 <%} }%>
 
 <hr>
-<table border="0"> <tr> <td>
-Change priority from <%=job.getPriority()%> to:
-<form action="jobdetails.jsp" method="post">
-<input type="hidden" name="action" value="changeprio"/>
-<input type="hidden" name="jobid" value="<%=jobId%>"/>
-</td><td> <select name="prio"> 
-<%
-  JobPriority jobPrio = job.getPriority();
-  for (JobPriority prio : JobPriority.values()) {
-    if(jobPrio != prio) {
-      %> <option value=<%=prio%>><%=prio%></option> <%
+<% if(JSPUtil.conf.getBoolean(PRIVATE_ACTIONS_KEY, false)) { %>
+  <table border="0"> <tr> <td>
+  Change priority from <%=job.getPriority()%> to:
+  <form action="jobdetails.jsp" method="post">
+  <input type="hidden" name="action" value="changeprio"/>
+  <input type="hidden" name="jobid" value="<%=jobId%>"/>
+  </td><td> <select name="prio"> 
+  <%
+    JobPriority jobPrio = job.getPriority();
+    for (JobPriority prio : JobPriority.values()) {
+      if(jobPrio != prio) {
+        %> <option value=<%=prio%>><%=prio%></option> <%
+      }
     }
-  }
-%>
-</select> </td><td><input type="submit" value="Submit"> </form></td></tr> </table>
+  %>
+  </select> </td><td><input type="submit" value="Submit"> </form></td></tr> </table>
+<% } %>
 
 <table border="0"> <tr>