瀏覽代碼

HADOOP-37: Add ClusterStatus. Contributed by Owen O'Malley.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@377798 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 年之前
父節點
當前提交
9458a936ef

+ 82 - 0
src/java/org/apache/hadoop/mapred/ClusterStatus.java

@@ -0,0 +1,82 @@
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.io.WritableFactories;
+
+/**
+ * Summarizes the size and current state of the cluster.
+ * @author Owen O'Malley
+ */
+public class ClusterStatus implements Writable {
+
+  static {                                        // register a ctor
+    WritableFactories.setFactory
+      (ClusterStatus.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new ClusterStatus(); }
+       });
+    }
+
+  private int task_trackers;
+  private int map_tasks;
+  private int reduce_tasks;
+  private int max_tasks;
+
+  private ClusterStatus() {}
+  
+  ClusterStatus(int trackers, int maps, int reduces, int max) {
+    task_trackers = trackers;
+    map_tasks = maps;
+    reduce_tasks = reduces;
+    max_tasks = max;
+  }
+  
+
+  /**
+   * The number of task trackers in the cluster.
+   */
+  public int getTaskTrackers() {
+    return task_trackers;
+  }
+  
+  /**
+   * The number of currently running map tasks.
+   */
+  public int getMapTasks() {
+    return map_tasks;
+  }
+  
+  /**
+   * The number of current running reduce tasks.
+   */
+  public int getReduceTasks() {
+    return reduce_tasks;
+  }
+  
+  /**
+   * The maximum capacity for running tasks in the cluster.
+   */
+  public int getMaxTasks() {
+    return max_tasks;
+  }
+  
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(task_trackers);
+    out.writeInt(map_tasks);
+    out.writeInt(reduce_tasks);
+    out.writeInt(max_tasks);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    task_trackers = in.readInt();
+    map_tasks = in.readInt();
+    reduce_tasks = in.readInt();
+    max_tasks = in.readInt();
+  }
+
+}

+ 4 - 0
src/java/org/apache/hadoop/mapred/JobClient.java

@@ -279,6 +279,10 @@ public class JobClient implements MRConstants {
         }
     }
 
+    public ClusterStatus getClusterStatus() throws IOException {
+      return jobSubmitClient.getClusterStatus();
+    }
+    
     /** Utility that submits a job, then polls for progress until the job is
      * complete. */
     public static void runJob(JobConf job) throws IOException {

+ 6 - 0
src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java

@@ -31,6 +31,12 @@ interface JobSubmissionProtocol {
      */
     public JobStatus submitJob(String jobFile) throws IOException;
 
+    /**
+     * Get the current status of the cluster
+     * @return summary of the state of the cluster
+     */
+    public ClusterStatus getClusterStatus();
+    
     /**
      * Kill the indicated job
      */

+ 7 - 0
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -651,6 +651,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         return job.getStatus();
     }
 
+    public synchronized ClusterStatus getClusterStatus() {
+        return new ClusterStatus(taskTrackers.size(),
+                                 totalMaps,
+                                 totalReduces,
+                                 maxCurrentTasks);
+    }
+    
     public synchronized void killJob(String jobid) {
         JobInProgress job = (JobInProgress) jobs.get(jobid);
         job.kill();

+ 10 - 0
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -32,6 +32,8 @@ class LocalJobRunner implements JobSubmissionProtocol {
   private FileSystem fs;
   private HashMap jobs = new HashMap();
   private Configuration conf;
+  private int map_tasks = 0;
+  private int reduce_tasks = 0;
 
   private class Job extends Thread
     implements TaskUmbilicalProtocol {
@@ -73,7 +75,9 @@ class LocalJobRunner implements JobSubmissionProtocol {
           mapIds.add("map_" + newId());
           MapTask map = new MapTask(file, (String)mapIds.get(i), splits[i]);
           map.setConf(job);
+          map_tasks += 1;
           map.run(job, this);
+          map_tasks -= 1;
         }
 
         // move map output to reduce input
@@ -98,7 +102,9 @@ class LocalJobRunner implements JobSubmissionProtocol {
                          mapDependencies,
                          0);
         reduce.setConf(job);
+        reduce_tasks += 1;
         reduce.run(job, this);
+        reduce_tasks -= 1;
         this.mapoutputFile.removeAll(reduceId);
         
         this.status.runState = JobStatus.SUCCEEDED;
@@ -184,4 +190,8 @@ class LocalJobRunner implements JobSubmissionProtocol {
   public String getFilesystemName() throws IOException {
     return fs.getName();
   }
+  
+  public ClusterStatus getClusterStatus() {
+    return new ClusterStatus(1, map_tasks, reduce_tasks, 1);
+  }
 }