Browse Source

MAPREDUCE-3481. [Gridmix] Improve Gridmix STRESS mode. (amarrk)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1237543 13f79535-47bb-0310-9956-ffa450edef68
Amar Kamat 13 years ago
parent
commit
5652e71992

+ 2 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -12,6 +12,8 @@ Trunk (unreleased changes)
     (Plamen Jeliazkov via shv)
 
   IMPROVEMENTS
+    MAPREDUCE-3481. [Gridmix] Improve Gridmix STRESS mode. (amarrk)
+
     MAPREDUCE-3597. [Rumen] Rumen should provide APIs to access all the 
                     job-history related information.
 

+ 9 - 2
hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java

@@ -101,13 +101,15 @@ public class Statistics implements Component<Job> {
     }
     
     int maps = 0;
+    int reds = 0;
     if (jobdesc == null) {
       throw new IllegalArgumentException(
         " JobStory not available for job " + job.getJobName());
     } else {
       maps = jobdesc.getNumberMaps();
+      reds = jobdesc.getNumberReduces();
     }
-    JobStats stats = new JobStats(maps,job);
+    JobStats stats = new JobStats(maps, reds, job);
     jobMaps.put(seq,stats);
   }
 
@@ -258,15 +260,20 @@ public class Statistics implements Component<Job> {
    */
   static class JobStats {
     private int noOfMaps;
+    private int noOfReds;
     private Job job;
 
-    public JobStats(int noOfMaps,Job job){
+    public JobStats(int noOfMaps,int numOfReds, Job job){
       this.job = job;
       this.noOfMaps = noOfMaps;
+      this.noOfReds = numOfReds;
     }
     public int getNoOfMaps() {
       return noOfMaps;
     }
+    public int getNoOfReds() {
+      return noOfReds;
+    }
 
     /**
      * Returns the job ,

+ 165 - 60
hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java

@@ -31,13 +31,12 @@ import org.apache.hadoop.tools.rumen.JobStoryProducer;
 
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.locks.Condition;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
   public static final Log LOG = LogFactory.getLog(StressJobFactory.class);
 
   private final LoadStatus loadStatus = new LoadStatus();
-  private final Condition condUnderloaded = this.lock.newCondition();
   /**
    * The minimum ratio between pending+running map tasks (aka. incomplete map
    * tasks) and cluster map slot capacity for us to consider the cluster is
@@ -150,23 +149,32 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
         }
         LOG.info("START STRESS @ " + System.currentTimeMillis());
         while (!Thread.currentThread().isInterrupted()) {
-          lock.lock();
           try {
             while (loadStatus.overloaded()) {
-              //Wait while JT is overloaded.
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Cluster overloaded in run! Sleeping...");
+              }
+              // sleep 
               try {
-                condUnderloaded.await();
+                Thread.sleep(1000);
               } catch (InterruptedException ie) {
                 return;
               }
             }
 
             while (!loadStatus.overloaded()) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Cluster underloaded in run! Stressing...");
+              }
               try {
+                //TODO This in-line read can block submission for large jobs.
                 final JobStory job = getNextJobFiltered();
                 if (null == job) {
                   return;
                 }
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Job Selected: " + job.getJobID());
+                }
                 submitter.add(
                   jobCreator.createGridmixJob(
                     conf, 0L, job, scratch, 
@@ -175,14 +183,20 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
                     sequence.getAndIncrement()));
                 // TODO: We need to take care of scenario when one map/reduce
                 // takes more than 1 slot.
-                loadStatus.mapSlotsBackfill -= 
-                  calcEffectiveIncompleteMapTasks(
-                    loadStatus.mapSlotCapacity, job.getNumberMaps(), 0.0f);
-                loadStatus.reduceSlotsBackfill -= 
-                  calcEffectiveIncompleteReduceTasks(
-                    loadStatus.reduceSlotCapacity, job.getNumberReduces(), 
-                    0.0f);
-                --loadStatus.numJobsBackfill;
+                
+                // Lock the loadjob as we are making updates
+                int incompleteMapTasks = (int) calcEffectiveIncompleteMapTasks(
+                                                 loadStatus.getMapCapacity(), 
+                                                 job.getNumberMaps(), 0.0f);
+                loadStatus.decrementMapLoad(incompleteMapTasks);
+                
+                int incompleteReduceTasks = 
+                  (int) calcEffectiveIncompleteReduceTasks(
+                          loadStatus.getReduceCapacity(), 
+                          job.getNumberReduces(), 0.0f);
+                loadStatus.decrementReduceLoad(incompleteReduceTasks);
+                  
+                loadStatus.decrementJobLoad(1);
               } catch (IOException e) {
                 LOG.error("Error while submitting the job ", e);
                 error = e;
@@ -191,7 +205,7 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
 
             }
           } finally {
-            lock.unlock();
+            // do nothing
           }
         }
       } catch (InterruptedException e) {
@@ -210,19 +224,11 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
    */
   @Override
   public void update(Statistics.ClusterStats item) {
-    lock.lock();
+    ClusterStatus clusterMetrics = item.getStatus();
     try {
-      ClusterStatus clusterMetrics = item.getStatus();
-      try {
-        checkLoadAndGetSlotsToBackfill(item,clusterMetrics);
-      } catch (Exception e) {
-        LOG.error("Couldn't get the new Status",e);
-      }
-      if (!loadStatus.overloaded()) {
-        condUnderloaded.signalAll();
-      }
-    } finally {
-      lock.unlock();
+      checkLoadAndGetSlotsToBackfill(item, clusterMetrics);
+    } catch (Exception e) {
+      LOG.error("Couldn't get the new Status",e);
     }
   }
 
@@ -254,18 +260,25 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
    */
   private void checkLoadAndGetSlotsToBackfill(
     ClusterStats stats, ClusterStatus clusterStatus) throws IOException, InterruptedException {
-    loadStatus.mapSlotCapacity = clusterStatus.getMaxMapTasks();
-    loadStatus.reduceSlotCapacity = clusterStatus.getMaxReduceTasks();
     
+    // update the max cluster capacity incase its updated
+    int mapCapacity = clusterStatus.getMaxMapTasks();
+    loadStatus.updateMapCapacity(mapCapacity);
+    
+    int reduceCapacity = clusterStatus.getMaxReduceTasks();
     
-    loadStatus.numJobsBackfill = 
-      (int) (maxJobTrackerRatio * clusterStatus.getTaskTrackers())
-        - stats.getNumRunningJob();
-    if (loadStatus.numJobsBackfill <= 0) {
+    loadStatus.updateReduceCapacity(reduceCapacity);
+    
+    int numTrackers = clusterStatus.getTaskTrackers();
+    
+    int jobLoad = 
+      (int) (maxJobTrackerRatio * numTrackers) - stats.getNumRunningJob();
+    loadStatus.updateJobLoad(jobLoad);
+    if (loadStatus.getJobLoad() <= 0) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug(System.currentTimeMillis() + " Overloaded is "
+        LOG.debug(System.currentTimeMillis() + " [JobLoad] Overloaded is "
                   + Boolean.TRUE.toString() + " NumJobsBackfill is "
-                  + loadStatus.numJobsBackfill);
+                  + loadStatus.getJobLoad());
       }
       return; // stop calculation because we know it is overloaded.
     }
@@ -275,56 +288,84 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
       float mapProgress = job.getJob().mapProgress();
       int noOfMaps = job.getNoOfMaps();
       incompleteMapTasks += 
-        calcEffectiveIncompleteMapTasks(
-          clusterStatus.getMaxMapTasks(), noOfMaps, mapProgress);
+        calcEffectiveIncompleteMapTasks(mapCapacity, noOfMaps, mapProgress);
     }
-    loadStatus.mapSlotsBackfill = 
-    (int) ((overloadMapTaskMapSlotRatio * clusterStatus.getMaxMapTasks()) 
-           - incompleteMapTasks);
-    if (loadStatus.mapSlotsBackfill <= 0) {
+    
+    int mapSlotsBackFill = 
+      (int) ((overloadMapTaskMapSlotRatio * mapCapacity) - incompleteMapTasks);
+    loadStatus.updateMapLoad(mapSlotsBackFill);
+    
+    if (loadStatus.getMapLoad() <= 0) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug(System.currentTimeMillis() + " Overloaded is "
+        LOG.debug(System.currentTimeMillis() + " [MAP-LOAD] Overloaded is "
                   + Boolean.TRUE.toString() + " MapSlotsBackfill is "
-                  + loadStatus.mapSlotsBackfill);
+                  + loadStatus.getMapLoad());
       }
       return; // stop calculation because we know it is overloaded.
     }
 
     float incompleteReduceTasks = 0; // include pending & running reduce tasks.
     for (JobStats job : ClusterStats.getRunningJobStats()) {
-      int noOfReduces = job.getJob().getNumReduceTasks();
+      // Cached the num-reds value in JobStats
+      int noOfReduces = job.getNoOfReds();
       if (noOfReduces > 0) {
         float reduceProgress = job.getJob().reduceProgress();
         incompleteReduceTasks += 
-          calcEffectiveIncompleteReduceTasks(
-            clusterStatus.getMaxReduceTasks(), noOfReduces, reduceProgress);
+          calcEffectiveIncompleteReduceTasks(reduceCapacity, noOfReduces, 
+                                             reduceProgress);
       }
     }
-    loadStatus.reduceSlotsBackfill = 
-      (int) ((overloadReduceTaskReduceSlotRatio * clusterStatus.getMaxReduceTasks()) 
+    
+    int reduceSlotsBackFill = 
+      (int)((overloadReduceTaskReduceSlotRatio * reduceCapacity) 
              - incompleteReduceTasks);
-    if (loadStatus.reduceSlotsBackfill <= 0) {
+    loadStatus.updateReduceLoad(reduceSlotsBackFill);
+    if (loadStatus.getReduceLoad() <= 0) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug(System.currentTimeMillis() + " Overloaded is "
+        LOG.debug(System.currentTimeMillis() + " [REDUCE-LOAD] Overloaded is "
                   + Boolean.TRUE.toString() + " ReduceSlotsBackfill is "
-                  + loadStatus.reduceSlotsBackfill);
+                  + loadStatus.getReduceLoad());
       }
       return; // stop calculation because we know it is overloaded.
     }
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug(System.currentTimeMillis() + " Overloaded is "
+      LOG.debug(System.currentTimeMillis() + " [OVERALL] Overloaded is "
                 + Boolean.FALSE.toString() + "Current load Status is " 
                 + loadStatus);
     }
   }
 
   static class LoadStatus {
-    int mapSlotsBackfill;
-    int mapSlotCapacity;
-    int reduceSlotsBackfill;
-    int reduceSlotCapacity;
-    int numJobsBackfill;
+    /**
+     * Additional number of map slots that can be requested before
+     * declaring (by Gridmix STRESS mode) the cluster as overloaded. 
+     */
+    private volatile int mapSlotsBackfill;
+    
+    /**
+     * Determines the total map slot capacity of the cluster.
+     */
+    private volatile int mapSlotCapacity;
+    
+    /**
+     * Additional number of reduce slots that can be requested before
+     * declaring (by Gridmix STRESS mode) the cluster as overloaded.
+     */
+    private volatile int reduceSlotsBackfill;
+    
+    /**
+     * Determines the total reduce slot capacity of the cluster.
+     */
+    private volatile int reduceSlotCapacity;
+
+    /**
+     * Determines the max count of running jobs in the cluster.
+     */
+    private volatile int numJobsBackfill;
+    
+    // set the default to true
+    private AtomicBoolean overloaded = new AtomicBoolean(true);
 
     /**
      * Construct the LoadStatus in an unknown state - assuming the cluster is
@@ -339,12 +380,76 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
       reduceSlotCapacity = -1;
     }
     
-    public boolean overloaded() {
-      return (mapSlotsBackfill <= 0) || (reduceSlotsBackfill <= 0)
-             || (numJobsBackfill <= 0);
+    public synchronized int getMapLoad() {
+      return mapSlotsBackfill;
+    }
+    
+    public synchronized int getMapCapacity() {
+      return mapSlotCapacity;
+    }
+    
+    public synchronized int getReduceLoad() {
+      return reduceSlotsBackfill;
+    }
+    
+    public synchronized int getReduceCapacity() {
+      return reduceSlotCapacity;
+    }
+    
+    public synchronized int getJobLoad() {
+      return numJobsBackfill;
+    }
+    
+    public synchronized void decrementMapLoad(int mapSlotsConsumed) {
+      this.mapSlotsBackfill -= mapSlotsConsumed;
+      updateOverloadStatus();
+    }
+    
+    public synchronized void decrementReduceLoad(int reduceSlotsConsumed) {
+      this.reduceSlotsBackfill -= reduceSlotsConsumed;
+      updateOverloadStatus();
+    }
+
+    public synchronized void decrementJobLoad(int numJobsConsumed) {
+      this.numJobsBackfill -= numJobsConsumed;
+      updateOverloadStatus();
+    }
+    
+    public synchronized void updateMapCapacity(int mapSlotsCapacity) {
+      this.mapSlotCapacity = mapSlotsCapacity;
+      updateOverloadStatus();
+    }
+    
+    public synchronized void updateReduceCapacity(int reduceSlotsCapacity) {
+      this.reduceSlotCapacity = reduceSlotsCapacity;
+      updateOverloadStatus();
+    }
+    
+    public synchronized void updateMapLoad(int mapSlotsBackfill) {
+      this.mapSlotsBackfill = mapSlotsBackfill;
+      updateOverloadStatus();
+    }
+    
+    public synchronized void updateReduceLoad(int reduceSlotsBackfill) {
+      this.reduceSlotsBackfill = reduceSlotsBackfill;
+      updateOverloadStatus();
+    }
+    
+    public synchronized void updateJobLoad(int numJobsBackfill) {
+      this.numJobsBackfill = numJobsBackfill;
+      updateOverloadStatus();
+    }
+    
+    private synchronized void updateOverloadStatus() {
+      overloaded.set((mapSlotsBackfill <= 0) || (reduceSlotsBackfill <= 0)
+                     || (numJobsBackfill <= 0));
+    }
+    
+    public synchronized boolean overloaded() {
+      return overloaded.get();
     }
     
-    public String toString() {
+    public synchronized String toString() {
     // TODO Use StringBuilder instead
       return " Overloaded = " + overloaded()
              + ", MapSlotBackfill = " + mapSlotsBackfill 

+ 16 - 7
hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java

@@ -101,10 +101,17 @@ public class TestGridmixSubmission {
       retiredJobs = new LinkedBlockingQueue<Job>();
     }
 
-    public void verify(ArrayList<JobStory> submitted) throws Exception {
+    public void verify(ArrayList<JobStory> submitted, Configuration clientConf) 
+    throws Exception {
       final ArrayList<Job> succeeded = new ArrayList<Job>();
       assertEquals("Bad job count", expected, retiredJobs.drainTo(succeeded));
       final HashMap<String,JobStory> sub = new HashMap<String,JobStory>();
+      
+      // define the input and output path for the run
+      final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
+      final Path out = 
+        new Path(in, clientConf.get(Gridmix.GRIDMIX_OUT_DIR, "gridmix"));
+      
       for (JobStory spec : submitted) {
         sub.put(spec.getJobID().toString(), spec);
       }
@@ -115,8 +122,7 @@ public class TestGridmixSubmission {
         Configuration conf = job.getConfiguration();
         if (GenerateData.JOB_NAME.equals(jobName)) {
           verifyQueue(conf, jobName);
-          final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
-          final Path out = new Path("/gridmix").makeQualified(GridmixTestUtils.dfs);
+          
           final ContentSummary generated = GridmixTestUtils.dfs.getContentSummary(in);
           assertTrue("Mismatched data gen", // +/- 100k for logs
               (GENDATA << 20) < generated.getLength() + GENSLOP ||
@@ -164,7 +170,7 @@ public class TestGridmixSubmission {
 
         final FileStatus stat = 
           GridmixTestUtils.dfs.getFileStatus(
-            new Path(GridmixTestUtils.DEST, "" + Integer.valueOf(jobSeqNum)));
+            new Path(out, "" + Integer.valueOf(jobSeqNum)));
         assertEquals("Wrong owner for " + jobName, spec.getUser(),
                      stat.getOwner());
 
@@ -337,8 +343,9 @@ public class TestGridmixSubmission {
     private JobFactory factory;
     private TestMonitor monitor;
 
-    public void checkMonitor() throws Exception {
-      monitor.verify(((DebugJobFactory.Debuggable)factory).getSubmitted());
+    public void checkMonitor(Configuration conf) throws Exception {
+      monitor.verify(((DebugJobFactory.Debuggable)factory).getSubmitted(), 
+                     conf);
     }
 
     @Override
@@ -534,9 +541,11 @@ public class TestGridmixSubmission {
       GridmixTestUtils.dfs.setPermission(root, new FsPermission((short)0777));
       int res = ToolRunner.run(conf, client, argv);
       assertEquals("Client exited with nonzero status", 0, res);
-      client.checkMonitor();
+      client.checkMonitor(conf);
     } catch (Exception e) {
       e.printStackTrace();
+      // fail the test if there is an exception
+      throw new RuntimeException(e);
     } finally {
       in.getFileSystem(conf).delete(in, true);
       out.getFileSystem(conf).delete(out, true);

+ 1 - 1
hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java

@@ -338,7 +338,7 @@ public class TestGridmixSummary {
         return isSuccessful;
       };
     };
-    return new JobStats(numMaps, fakeJob);
+    return new JobStats(numMaps, numReds, fakeJob);
   }
   
   /**