Browse Source

commit 6ba9a15e4d54612f0f5b6170ca4072a44136b77a
Author: Arun C Murthy <acmurthy@apache.org>
Date: Wed Sep 15 09:55:16 2010 -0700

MAPREDUCE-1943. Ensure counter limits are enforced across counters of all tasks. Contributed by Mahadev Konar.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077677 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 years ago
parent
commit
de49b537d2

+ 7 - 0
src/mapred/mapred-default.xml

@@ -1198,6 +1198,13 @@
   </description>
 </property>
 
+<property>
+  <name>mapreduce.job.counters.limit</name>
+  <value>120</value>
+  <description>Limit on the number of counters allowed per job.
+  </description>
+</property>
+
 <!--  end of node health script variables -->
 
 </configuration>

+ 5 - 2
src/mapred/org/apache/hadoop/mapred/CompletedJobStatusStore.java

@@ -178,8 +178,11 @@ class CompletedJobStatusStore implements Runnable {
         job.getStatus().write(dataOut);
 
         job.getProfile().write(dataOut);
-
-        job.getCounters().write(dataOut);
+        
+        Counters counters = new Counters();
+        boolean isFine = job.getCounters(counters);
+        counters = (isFine? counters: new Counters());
+        counters.write(dataOut);
 
         TaskCompletionEvent[] events = 
                 job.getTaskCompletionEvents(0, Integer.MAX_VALUE);

+ 21 - 4
src/mapred/org/apache/hadoop/mapred/Counters.java

@@ -65,14 +65,18 @@ public class Counters implements Writable, Iterable<Counters.Group> {
   private static final int GROUP_NAME_LIMIT = 128;
   /** limit on the size of the counter name **/
   private static final int COUNTER_NAME_LIMIT = 64;
-  /** the max number of counters **/
-  static final int MAX_COUNTER_LIMIT = 80;
+  
+  private static final JobConf conf = new JobConf();
+  /** limit on counters **/
+  public static int MAX_COUNTER_LIMIT = 
+    conf.getInt("mapreduce.job.counters.limit", 120);
+
   /** the max groups allowed **/
   static final int MAX_GROUP_LIMIT = 50;
   
   /** the number of current counters**/
   private int numCounters = 0;
-  
+
   //private static Log log = LogFactory.getLog("Counters.class");
   
   /**
@@ -308,7 +312,7 @@ public class Counters implements Writable, Iterable<Counters.Group> {
         }
         numCounters = (numCounters == 0) ? Counters.this.size(): numCounters; 
         if (numCounters >= MAX_COUNTER_LIMIT) {
-          throw new RuntimeException("Error: Exceeded limits on number of counters - " 
+          throw new CountersExceededException("Error: Exceeded limits on number of counters - " 
               + "Counters=" + numCounters + " Limit=" + MAX_COUNTER_LIMIT);
         }
         result = new Counter(shortName, localize(shortName + ".name", shortName), 0L);
@@ -799,4 +803,17 @@ public class Counters implements Writable, Iterable<Counters.Group> {
     }
     return isEqual;
   }
+  
+  /**
+   * Counter exception thrown when the number of counters exceed 
+   * the limit
+   */
+  public static class CountersExceededException extends RuntimeException {
+  
+    private static final long serialVersionUID = 1L;
+
+    public CountersExceededException(String msg) {
+      super(msg);
+    }
+  }
 }

+ 7 - 1
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -1293,7 +1293,13 @@ public class JobClient extends Configured implements MRConstants, Tool  {
       }
     }
     LOG.info("Job complete: " + jobId);
-    Counters counters = job.getCounters();
+    Counters counters = null;
+    try{
+       counters = job.getCounters();
+    } catch(IOException ie) {
+      counters = null;
+      LOG.info(ie.getMessage());
+    }
     if (counters != null) {
       counters.log(LOG);
     }

+ 78 - 19
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -36,7 +36,6 @@ import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.Vector;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,6 +45,8 @@ import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapred.Counters.CountersExceededException;
+import org.apache.hadoop.mapred.Counters.Group;
 import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
@@ -116,7 +117,7 @@ public class JobInProgress {
   long reduce_input_limit = -1L;
   private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
   int completedMapsForReduceSlowstart = 0;
-  
+    
   // runningMapTasks include speculative tasks, so we need to capture 
   // speculative tasks separately 
   int speculativeMapTasks = 0;
@@ -271,7 +272,7 @@ public class JobInProgress {
     FALLOW_SLOTS_MILLIS_REDUCES
   }
   private Counters jobCounters = new Counters();
-  
+    
   // Maximum no. of fetch-failure notifications after which
   // the map task is killed
   private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
@@ -1010,6 +1011,7 @@ public class JobInProgress {
     }
     return results;
   }
+  
 
   ////////////////////////////////////////////////////
   // Status update methods
@@ -1206,27 +1208,47 @@ public class JobInProgress {
   
   /**
    *  Returns map phase counters by summing over all map tasks in progress.
+   *  This method returns true if counters are within limit or false.
    */
-  public synchronized Counters getMapCounters() {
-    return incrementTaskCounters(new Counters(), maps);
+  public synchronized boolean getMapCounters(Counters counters) {
+    try {
+      counters = incrementTaskCounters(counters, maps);
+    } catch(CountersExceededException ce) {
+      LOG.info("Counters Exceeded for Job: " + jobId, ce);
+      return false;
+    }
+    return true;
   }
     
   /**
    *  Returns map phase counters by summing over all map tasks in progress.
+   *  This method returns true if counters are within limits and false otherwise.
    */
-  public synchronized Counters getReduceCounters() {
-    return incrementTaskCounters(new Counters(), reduces);
+  public synchronized boolean getReduceCounters(Counters counters) {
+    try {
+      counters = incrementTaskCounters(counters, reduces);
+    } catch(CountersExceededException ce) {
+      LOG.info("Counters Exceeded for Job: " + jobId, ce);
+      return false;
+    }
+    return true;
   }
     
   /**
    *  Returns the total job counters, by adding together the job, 
-   *  the map and the reduce counters.
+   *  the map and the reduce counters. This method returns true if
+   *  counters are within limits and false otherwise.
    */
-  public synchronized Counters getCounters() {
-    Counters result = new Counters();
-    result.incrAllCounters(getJobCounters());
-    incrementTaskCounters(result, maps);
-    return incrementTaskCounters(result, reduces);
+  public synchronized boolean getCounters(Counters result) {
+    try {
+      result.incrAllCounters(getJobCounters());
+      incrementTaskCounters(result, maps);
+      incrementTaskCounters(result, reduces);
+    } catch(CountersExceededException ce) {
+      LOG.info("Counters Exceeded for Job: " + jobId, ce);
+      return false;
+    }
+    return true;
   }
     
   /**
@@ -2566,6 +2588,9 @@ public class JobInProgress {
       retireMap(tip);
       if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
         this.status.setMapProgress(1.0f);
+        if (canLaunchJobCleanupTask()) {
+          checkCounterLimitsAndFail();
+        }
       }
     } else {
       runningReduceTasks -= 1;
@@ -2578,12 +2603,33 @@ public class JobInProgress {
       retireReduce(tip);
       if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
         this.status.setReduceProgress(1.0f);
+        if (canLaunchJobCleanupTask()) {
+          checkCounterLimitsAndFail();
+        }
       }
     }
-    
     return true;
   }
-
+  
+  /**
+   * add up the counters and fail the job
+   * if it exceeds the counters. Make sure we do not
+   * recalculate the coutners after we fail the job. Currently
+   * this is taken care by terminateJob() since it does not 
+   * calculate the counters.
+   */
+  private void checkCounterLimitsAndFail() {
+    boolean mapIsFine, reduceIsFine, jobIsFine = true;
+    mapIsFine = getMapCounters(new Counters());
+    reduceIsFine = getReduceCounters(new Counters());
+    jobIsFine = getCounters(new Counters());
+    if (!(mapIsFine && reduceIsFine && jobIsFine)) {
+      status.setFailureInfo("Counters Exceeded: Limit: " + 
+          Counters.MAX_COUNTER_LIMIT);
+      jobtracker.failJob(this);
+    }
+  }
+  
   /**
    * Job state change must happen thru this call
    */
@@ -2627,20 +2673,33 @@ public class JobInProgress {
       if (reduces.length == 0) {
         this.status.setReduceProgress(1.0f);
       }
+     
       this.finishTime = jobtracker.getClock().getTime();
       LOG.info("Job " + this.status.getJobID() + 
-               " has completed successfully.");
-      
+      " has completed successfully.");
+
       // Log the job summary (this should be done prior to logging to 
       // job-history to ensure job-counters are in-sync 
       JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
       
+      Counters mapCounters = new Counters();
+      boolean isFine = getMapCounters(mapCounters);
+      mapCounters = (isFine ? mapCounters: new Counters());
+      Counters reduceCounters = new Counters();
+      isFine = getReduceCounters(reduceCounters);;
+      reduceCounters = (isFine ? reduceCounters: new Counters());
+      Counters jobCounters = new Counters();
+      isFine = getCounters(jobCounters);
+      jobCounters = (isFine? jobCounters: new Counters());
+      
       // Log job-history
       JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime, 
                                      this.finishedMapTasks, 
                                      this.finishedReduceTasks, failedMapTasks, 
-                                     failedReduceTasks, getMapCounters(),
-                                     getReduceCounters(), getCounters());
+                                     failedReduceTasks, mapCounters,
+                                     reduceCounters, jobCounters);
+      
+      
       // Note that finalize will close the job history handles which garbage collect
       // might try to finalize
       garbageCollect();

+ 18 - 4
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -74,6 +74,7 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
 import org.apache.hadoop.mapred.AuditLogger.Constants;
+import org.apache.hadoop.mapred.Counters.CountersExceededException;
 import org.apache.hadoop.mapred.JobHistory.Keys;
 import org.apache.hadoop.mapred.JobHistory.Listener;
 import org.apache.hadoop.mapred.JobHistory.Values;
@@ -133,7 +134,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   static long TASKTRACKER_EXPIRY_INTERVAL = 10 * 60 * 1000;
   static long RETIRE_JOB_INTERVAL;
   static long RETIRE_JOB_CHECK_INTERVAL;
-
+  
   private final long DELEGATION_TOKEN_GC_INTERVAL = 3600000; // 1 hour
   private final DelegationTokenSecretManager secretManager;
 
@@ -561,7 +562,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     }
 
     synchronized void addToCache(JobInProgress job) {
-      RetireJobInfo info = new RetireJobInfo(job.getCounters(), job.getStatus(),
+      Counters counters = new Counters();
+      boolean isFine = job.getCounters(counters);
+      counters = (isFine? counters: new Counters());
+      RetireJobInfo info = new RetireJobInfo(counters, job.getStatus(),
           job.getProfile(), job.getFinishTime(), job.getHistoryFile());
       jobRetireInfoQ.add(info);
       jobIDStatusMap.put(info.status.getJobID(), info);
@@ -4311,8 +4315,18 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
 
         // check the job-access
         aclsManager.checkAccess(job, callerUGI, Operation.VIEW_JOB_COUNTERS);
-
-        return isJobInited(job) ? job.getCounters() : EMPTY_COUNTERS;
+        Counters counters = new Counters();
+        if (isJobInited(job)) {
+          boolean isFine = job.getCounters(counters);
+          if (!isFine) {
+            throw new IOException("Counters Exceeded limit: " + 
+                Counters.MAX_COUNTER_LIMIT);
+          }
+          return counters;
+        }
+        else {
+          return EMPTY_COUNTERS;
+        }
       } else {
         RetireJobInfo info = retireJobs.get(jobid);
         if (info != null) {

+ 6 - 3
src/test/org/apache/hadoop/mapred/TestJobHistory.java

@@ -559,15 +559,18 @@ public class TestJobHistory extends TestCase {
                values.get(Keys.USER)));
 
     // Validate job counters
-    Counters c = jip.getCounters();
+    Counters c = new Counters();
+    jip.getCounters(c);
     assertTrue("Counters of job obtained from history file did not " +
                "match the expected value",
                c.makeEscapedCompactString().equals(values.get(Keys.COUNTERS)));
-    Counters m = jip.getMapCounters();
+    Counters m = new Counters();
+    jip.getMapCounters(m);
     assertTrue("Map Counters of job obtained from history file did not " +
                "match the expected value", m.makeEscapedCompactString().
                equals(values.get(Keys.MAP_COUNTERS)));
-    Counters r = jip.getReduceCounters();
+    Counters r = new Counters();
+    jip.getReduceCounters(r);
     assertTrue("Reduce Counters of job obtained from history file did not " +
                "match the expected value", r.makeEscapedCompactString().
                equals(values.get(Keys.REDUCE_COUNTERS)));

+ 33 - 4
src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java

@@ -25,6 +25,7 @@ import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
 import java.util.Iterator;
+import java.util.Properties;
 
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -37,11 +38,27 @@ import org.apache.hadoop.util.StringUtils;
 import org.hsqldb.lib.StringUtil;
 
 public class TestUserDefinedCounters extends ClusterMapReduceTestCase {
+  protected void setUp() throws Exception {
+    super.setUp();
+    Properties prop = new Properties();
+    prop.put("mapred.job.tracker.persist.jobstatus.active", "true");
+    prop.put("mapred.job.tracker.persist.jobstatus.hours", "1");
+    startCluster(true, prop);
+  }
   
   enum EnumCounter { MAP_RECORDS }
   
   static class CountingMapper<K, V> extends IdentityMapper<K, V> {
-
+    private JobConf jconf;
+    boolean generateUniqueCounters = false;
+    
+    @Override
+    public void configure(JobConf jconf) {
+      this.jconf = jconf;
+      this.generateUniqueCounters = 
+        jconf.getBoolean("task.generate.unique.counters", false);
+    }
+    
     public void map(K key, V value,
         OutputCollector<K, V> output, Reporter reporter)
         throws IOException {
@@ -49,8 +66,14 @@ public class TestUserDefinedCounters extends ClusterMapReduceTestCase {
       reporter.incrCounter(EnumCounter.MAP_RECORDS, 1);
       reporter.incrCounter("StringCounter", "MapRecords", 1);
       for (int i =0; i < 50; i++) {
-        reporter.incrCounter("StringCounter", "countername" + i, 1);
-      }      
+        if (generateUniqueCounters) {
+          reporter.incrCounter("StringCounter", "countername_" + 
+              jconf.get("mapred.task.id") + "_"+ i, 1);
+        } else {
+          reporter.incrCounter("StringCounter", "countername_" + 
+              i, 1);
+        }
+      }    
     }
   }
   
@@ -120,6 +143,12 @@ public class TestUserDefinedCounters extends ClusterMapReduceTestCase {
       System.out.println("Exceeded counter " + 
           StringUtils.stringifyException(re));
     }
+    conf.setBoolean("task.generate.unique.counters", true);
+    FileOutputFormat.setOutputPath(conf, new Path("output-fail"));
+    try {
+      runningJob = JobClient.runJob(conf);
+    } catch(Exception ie) {
+      System.out.println(StringUtils.stringifyException(ie));
+    }
   }
-
 }

+ 11 - 4
src/webapps/job/jobdetails.jsp

@@ -365,10 +365,17 @@
       <th>Total</th>
     </tr>
     <%
-    Counters mapCounters = job.getMapCounters();
-    Counters reduceCounters = job.getReduceCounters();
-    Counters totalCounters = job.getCounters();
-    
+    boolean isFine = true;
+    Counters mapCounters = new Counters();
+    isFine = job.getMapCounters(mapCounters);
+    mapCounters = (isFine? mapCounters: new Counters());
+    Counters reduceCounters = new Counters();
+    isFine = job.getReduceCounters(reduceCounters);
+    reduceCounters = (isFine? reduceCounters: new Counters());
+    Counters totalCounters = new Counters();
+    isFine = job.getCounters(totalCounters);
+    totalCounters = (isFine? totalCounters: new Counters());
+        
     for (String groupName : totalCounters.getGroupNames()) {
       Counters.Group totalGroup = totalCounters.getGroup(groupName);
       Counters.Group mapGroup = mapCounters.getGroup(groupName);