瀏覽代碼

MAPREDUCE-5875. Make Counter limits consistent across JobClient, MRAppMaster, and YarnChild. (Gera Shegalov via kasha)

(cherry picked from commit ed0e0ef9748ce5b231de677ab41c3035137bbde4)
Karthik Kambatla 10 年之前
父節點
當前提交
7bfd9e068d

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -201,6 +201,9 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-6123. TestCombineFileInputFormat incorrectly starts 2
     MiniDFSCluster instances. (cnauroth)
 
+    MAPREDUCE-5875. Make Counter limits consistent across JobClient, 
+    MRAppMaster, and YarnChild. (Gera Shegalov via kasha)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

+ 3 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.counters.Limits;
 import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.EventReader;
 import org.apache.hadoop.mapreduce.jobhistory.EventType;
@@ -1088,6 +1089,8 @@ public class MRAppMaster extends CompositeService {
 
     // finally set the job classloader
     MRApps.setClassLoader(jobClassLoader, getConfig());
+    // set job classloader if configured
+    Limits.init(getConfig());
 
     if (initFailed) {
       JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);

+ 8 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java

@@ -182,15 +182,15 @@ public class Cluster {
   public Job getJob(JobID jobId) throws IOException, InterruptedException {
     JobStatus status = client.getJobStatus(jobId);
     if (status != null) {
-      JobConf conf;
+      final JobConf conf = new JobConf();
+      final Path jobPath = new Path(client.getFilesystemName(),
+          status.getJobFile());
+      final FileSystem fs = FileSystem.get(jobPath.toUri(), getConf());
       try {
-        conf = new JobConf(status.getJobFile());
-      } catch (RuntimeException ex) {
-        // If job file doesn't exist it means we can't find the job
-        if (ex.getCause() instanceof FileNotFoundException) {
-          return null;
-        } else {
-          throw ex;
+        conf.addResource(fs.open(jobPath), jobPath.toString());
+      } catch (FileNotFoundException fnf) {
+        if (LOG.isWarnEnabled()) {
+          LOG.warn("Job conf missing on cluster", fnf);
         }
       }
       return Job.getInstance(this, status, conf);

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java

@@ -50,6 +50,7 @@ import org.apache.hadoop.mapred.QueueACL;
 
 import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 
+import org.apache.hadoop.mapreduce.counters.Limits;
 import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
@@ -437,6 +438,7 @@ class JobSubmitter {
 
       // Write job file to submit dir
       writeConf(conf, submitJobFile);
+      Limits.reset(conf);
       
       //
       // Now, actually submit the job (using the submit name)

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java

@@ -123,4 +123,9 @@ public class Limits {
   public synchronized LimitExceededException violation() {
     return firstViolation;
   }
+
+  public static synchronized void reset(Configuration conf) {
+    isInited = false;
+    init(conf);
+  }
 }

+ 17 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapreduce.jobhistory;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.text.DecimalFormat;
 import java.text.Format;
@@ -29,6 +30,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -41,6 +44,7 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.counters.Limits;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.util.HostUtil;
@@ -54,7 +58,8 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class HistoryViewer {
-  private static SimpleDateFormat dateFormat = 
+  private static final Log LOG = LogFactory.getLog(HistoryViewer.class);
+  private static final SimpleDateFormat dateFormat =
     new SimpleDateFormat("d-MMM-yyyy HH:mm:ss");
   private FileSystem fs;
   private JobInfo job;
@@ -83,6 +88,17 @@ public class HistoryViewer {
         System.err.println("Ignore unrecognized file: " + jobFile.getName());
         throw new IOException(errorMsg);
       }
+      final Path jobConfPath = new Path(jobFile.getParent(),  jobDetails[0]
+          + "_" + jobDetails[1] + "_" + jobDetails[2] + "_conf.xml");
+      final Configuration jobConf = new Configuration(conf);
+      try {
+        jobConf.addResource(fs.open(jobConfPath), jobConfPath.toString());
+        Limits.reset(conf);
+      } catch (FileNotFoundException fnf) {
+        if (LOG.isWarnEnabled()) {
+          LOG.warn("Missing job conf in history", fnf);
+        }
+      }
       JobHistoryParser parser = new JobHistoryParser(fs, jobFile);
       job = parser.parse();
       jobId = job.getJobId().toString();

+ 15 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2.hs;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -34,6 +35,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobACLsManager;
 import org.apache.hadoop.mapred.TaskCompletionEvent;
@@ -41,6 +43,7 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.counters.Limits;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
@@ -331,9 +334,21 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
     if (historyFileAbsolute != null) {
       JobHistoryParser parser = null;
       try {
+        final FileSystem fs = historyFileAbsolute.getFileSystem(conf);
         parser =
             new JobHistoryParser(historyFileAbsolute.getFileSystem(conf),
                 historyFileAbsolute);
+        final Path jobConfPath = new Path(historyFileAbsolute.getParent(),
+            JobHistoryUtils.getIntermediateConfFileName(jobId));
+        final Configuration conf = new Configuration();
+        try {
+          conf.addResource(fs.open(jobConfPath), jobConfPath.toString());
+          Limits.reset(conf);
+        } catch (FileNotFoundException fnf) {
+          if (LOG.isWarnEnabled()) {
+            LOG.warn("Missing job conf in history", fnf);
+          }
+        }
         this.jobInfo = parser.parse();
       } catch (IOException e) {
         throw new YarnRuntimeException("Could not load history file "

+ 74 - 13
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java

@@ -53,10 +53,14 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
@@ -105,6 +109,7 @@ public class TestMRJobs {
       EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED);
   private static final int NUM_NODE_MGRS = 3;
   private static final String TEST_IO_SORT_MB = "11";
+  private static final String TEST_GROUP_MAX = "200";
 
   protected static MiniMRYarnCluster mrCluster;
   protected static MiniDFSCluster dfsCluster;
@@ -213,31 +218,58 @@ public class TestMRJobs {
   }
 
   @Test(timeout = 300000)
-  public void testJobClassloader() throws IOException, InterruptedException,
-      ClassNotFoundException {
-    testJobClassloader(false);
+  public void testConfVerificationWithClassloader() throws Exception {
+    testConfVerification(true, false, false, false);
   }
 
   @Test(timeout = 300000)
-  public void testJobClassloaderWithCustomClasses() throws IOException,
-      InterruptedException, ClassNotFoundException {
-    testJobClassloader(true);
+  public void testConfVerificationWithClassloaderCustomClasses()
+      throws Exception {
+    testConfVerification(true, true, false, false);
   }
 
-  private void testJobClassloader(boolean useCustomClasses) throws IOException,
-      InterruptedException, ClassNotFoundException {
-    LOG.info("\n\n\nStarting testJobClassloader()"
-        + " useCustomClasses=" + useCustomClasses);
+  @Test(timeout = 300000)
+  public void testConfVerificationWithOutClassloader() throws Exception {
+    testConfVerification(false, false, false, false);
+  }
+
+  @Test(timeout = 300000)
+  public void testConfVerificationWithJobClient() throws Exception {
+    testConfVerification(false, false, true, false);
+  }
+
+  @Test(timeout = 300000)
+  public void testConfVerificationWithJobClientLocal() throws Exception {
+    testConfVerification(false, false, true, true);
+  }
+
+  private void testConfVerification(boolean useJobClassLoader,
+      boolean useCustomClasses, boolean useJobClientForMonitring,
+      boolean useLocal) throws Exception {
+    LOG.info("\n\n\nStarting testConfVerification()"
+        + " jobClassloader=" + useJobClassLoader
+        + " customClasses=" + useCustomClasses
+        + " jobClient=" + useJobClientForMonitring
+        + " localMode=" + useLocal);
 
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
                + " not found. Not running test.");
       return;
     }
-    final Configuration sleepConf = new Configuration(mrCluster.getConfig());
+    final Configuration clusterConfig;
+    if (useLocal) {
+      clusterConfig = new Configuration();
+      conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
+    } else {
+      clusterConfig = mrCluster.getConfig();
+    }
+    final JobClient jc = new JobClient(clusterConfig);
+    final Configuration sleepConf = new Configuration(clusterConfig);
     // set master address to local to test that local mode applied iff framework == local
     sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
-    sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
+    sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER,
+        useJobClassLoader);
     if (useCustomClasses) {
       // to test AM loading user classes such as output format class, we want
       // to blacklist them from the system classes (they need to be prepended
@@ -255,6 +287,7 @@ public class TestMRJobs {
     sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString());
     sleepConf.set(MRJobConfig.REDUCE_LOG_LEVEL, Level.ALL.toString());
     sleepConf.set(MRJobConfig.MAP_JAVA_OPTS, "-verbose:class");
+    sleepConf.set(MRJobConfig.COUNTER_GROUPS_MAX_KEY, TEST_GROUP_MAX);
     final SleepJob sleepJob = new SleepJob();
     sleepJob.setConf(sleepConf);
     final Job job = sleepJob.createJob(1, 1, 10, 1, 10, 1);
@@ -272,7 +305,26 @@ public class TestMRJobs {
       jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true);
     }
     job.submit();
-    boolean succeeded = job.waitForCompletion(true);
+    final boolean succeeded;
+    if (useJobClientForMonitring && !useLocal) {
+      // We can't use getJobID in useLocal case because JobClient and Job
+      // point to different instances of LocalJobRunner
+      //
+      final JobID mapredJobID = JobID.downgrade(job.getJobID());
+      RunningJob runningJob = null;
+      do {
+        Thread.sleep(10);
+        runningJob = jc.getJob(mapredJobID);
+      } while (runningJob == null);
+      Assert.assertEquals("Unexpected RunningJob's "
+          + MRJobConfig.COUNTER_GROUPS_MAX_KEY,
+          TEST_GROUP_MAX, runningJob.getConfiguration()
+              .get(MRJobConfig.COUNTER_GROUPS_MAX_KEY));
+      runningJob.waitForCompletion();
+      succeeded = runningJob.isSuccessful();
+    } else {
+      succeeded = job.waitForCompletion(true);
+    }
     Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(),
         succeeded);
   }
@@ -925,5 +977,14 @@ public class TestMRJobs {
             + ", actual: "  + ioSortMb);
       }
     }
+
+    @Override
+    public void map(IntWritable key, IntWritable value, Context context) throws IOException, InterruptedException {
+      super.map(key, value, context);
+      for (int i = 0; i < 100; i++) {
+        context.getCounter("testCounterGroup-" + i,
+            "testCounter").increment(1);
+      }
+    }
   }
 }