|
@@ -0,0 +1,244 @@
|
|
|
+/**
|
|
|
+* Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+* or more contributor license agreements. See the NOTICE file
|
|
|
+* distributed with this work for additional information
|
|
|
+* regarding copyright ownership. The ASF licenses this file
|
|
|
+* to you under the Apache License, Version 2.0 (the
|
|
|
+* "License"); you may not use this file except in compliance
|
|
|
+* with the License. You may obtain a copy of the License at
|
|
|
+*
|
|
|
+* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+*
|
|
|
+* Unless required by applicable law or agreed to in writing, software
|
|
|
+* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+* See the License for the specific language governing permissions and
|
|
|
+* limitations under the License.
|
|
|
+*/
|
|
|
+
|
|
|
+package org.apache.hadoop.mapreduce.v2;
|
|
|
+
|
|
|
+import java.io.*;
|
|
|
+import java.util.*;
|
|
|
+
|
|
|
+import java.util.regex.Matcher;
|
|
|
+import java.util.regex.Pattern;
|
|
|
+
|
|
|
+import junit.framework.Assert;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.SleepJob;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.*;
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.mapred.*;
|
|
|
+import org.apache.hadoop.mapreduce.*;
|
|
|
+import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.Before;
|
|
|
+import org.junit.Test;
|
|
|
+
|
|
|
+public class TestMRJobsWithProfiler {
|
|
|
+
|
|
|
+ private static final Log LOG =
|
|
|
+ LogFactory.getLog(TestMRJobsWithProfiler.class);
|
|
|
+
|
|
|
+ private static final EnumSet<RMAppState> TERMINAL_RM_APP_STATES =
|
|
|
+ EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED);
|
|
|
+
|
|
|
+ private static MiniMRYarnCluster mrCluster;
|
|
|
+
|
|
|
+ private static final Configuration CONF = new Configuration();
|
|
|
+ private static final FileSystem localFs;
|
|
|
+ static {
|
|
|
+ try {
|
|
|
+ localFs = FileSystem.getLocal(CONF);
|
|
|
+ } catch (IOException io) {
|
|
|
+ throw new RuntimeException("problem getting local fs", io);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static final Path TEST_ROOT_DIR =
|
|
|
+ new Path("target", TestMRJobs.class.getName() + "-tmpDir").
|
|
|
+ makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
|
|
|
+
|
|
|
+ private static final Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
|
|
|
+
|
|
|
+ @Before
|
|
|
+ public void setup() throws InterruptedException, IOException {
|
|
|
+
|
|
|
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
|
|
|
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
|
|
|
+ + " not found. Not running test.");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (mrCluster == null) {
|
|
|
+ mrCluster = new MiniMRYarnCluster(getClass().getName());
|
|
|
+ mrCluster.init(CONF);
|
|
|
+ mrCluster.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Copy MRAppJar and make it private. TODO: FIXME. This is a hack to
|
|
|
+ // workaround the absent public discache.
|
|
|
+ localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
|
|
|
+ localFs.setPermission(APP_JAR, new FsPermission("700"));
|
|
|
+ }
|
|
|
+
|
|
|
+ @After
|
|
|
+ public void tearDown() {
|
|
|
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
|
|
|
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
|
|
|
+ + " not found. Not running test.");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (mrCluster != null) {
|
|
|
+ mrCluster.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Test (timeout = 120000)
|
|
|
+ public void testProfiler() throws IOException, InterruptedException,
|
|
|
+ ClassNotFoundException {
|
|
|
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
|
|
|
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
|
|
|
+ + " not found. Not running test.");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ final SleepJob sleepJob = new SleepJob();
|
|
|
+ final JobConf sleepConf = new JobConf(mrCluster.getConfig());
|
|
|
+
|
|
|
+ sleepConf.setProfileEnabled(true);
|
|
|
+ // profile map split 1
|
|
|
+ sleepConf.setProfileTaskRange(true, "1");
|
|
|
+ // profile reduce of map output partitions 1
|
|
|
+ sleepConf.setProfileTaskRange(false, "1");
|
|
|
+
|
|
|
+ // use hprof for map to profile.out
|
|
|
+ sleepConf.set(MRJobConfig.TASK_MAP_PROFILE_PARAMS,
|
|
|
+ "-agentlib:hprof=cpu=times,heap=sites,force=n,thread=y,verbose=n,"
|
|
|
+ + "file=%s");
|
|
|
+
|
|
|
+ // use Xprof for reduce to stdout
|
|
|
+ sleepConf.set(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, "-Xprof");
|
|
|
+ sleepJob.setConf(sleepConf);
|
|
|
+
|
|
|
+ // 2-map-2-reduce SleepJob
|
|
|
+ final Job job = sleepJob.createJob(2, 2, 500, 1, 500, 1);
|
|
|
+ job.setJarByClass(SleepJob.class);
|
|
|
+ job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
|
|
|
+ job.waitForCompletion(true);
|
|
|
+ final JobId jobId = TypeConverter.toYarn(job.getJobID());
|
|
|
+ final ApplicationId appID = jobId.getAppId();
|
|
|
+ int pollElapsed = 0;
|
|
|
+ while (true) {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ pollElapsed += 1000;
|
|
|
+
|
|
|
+ if (TERMINAL_RM_APP_STATES.contains(
|
|
|
+ mrCluster.getResourceManager().getRMContext().getRMApps().get(appID)
|
|
|
+ .getState())) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (pollElapsed >= 60000) {
|
|
|
+ LOG.warn("application did not reach terminal state within 60 seconds");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Assert.assertEquals(RMAppState.FINISHED, mrCluster.getResourceManager()
|
|
|
+ .getRMContext().getRMApps().get(appID).getState());
|
|
|
+
|
|
|
+ // Job finished, verify logs
|
|
|
+ //
|
|
|
+ final Configuration nmConf = mrCluster.getNodeManager(0).getConfig();
|
|
|
+
|
|
|
+ final String appIdStr = appID.toString();
|
|
|
+ final String appIdSuffix = appIdStr.substring(
|
|
|
+ "application_".length(), appIdStr.length());
|
|
|
+ final String containerGlob = "container_" + appIdSuffix + "_*_*";
|
|
|
+
|
|
|
+ final Map<TaskAttemptID,Path> taLogDirs = new HashMap<TaskAttemptID,Path>();
|
|
|
+ final Pattern taskPattern = Pattern.compile(
|
|
|
+ ".*Task:(attempt_"
|
|
|
+ + appIdSuffix + "_[rm]_" + "[0-9]+_[0-9]+).*");
|
|
|
+ for (String logDir :
|
|
|
+ nmConf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS))
|
|
|
+ {
|
|
|
+ // filter out MRAppMaster and create attemptId->logDir map
|
|
|
+ //
|
|
|
+ for (FileStatus fileStatus :
|
|
|
+ localFs.globStatus(new Path(logDir
|
|
|
+ + Path.SEPARATOR + appIdStr
|
|
|
+ + Path.SEPARATOR + containerGlob
|
|
|
+ + Path.SEPARATOR + TaskLog.LogName.SYSLOG)))
|
|
|
+ {
|
|
|
+ final BufferedReader br = new BufferedReader(
|
|
|
+ new InputStreamReader(localFs.open(fileStatus.getPath())));
|
|
|
+ String line;
|
|
|
+ while ((line = br.readLine()) != null) {
|
|
|
+ final Matcher m = taskPattern.matcher(line);
|
|
|
+ if (m.matches()) {
|
|
|
+ // found Task done message
|
|
|
+ taLogDirs.put(TaskAttemptID.forName(m.group(1)),
|
|
|
+ fileStatus.getPath().getParent());
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ br.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Assert.assertEquals(4, taLogDirs.size()); // all 4 attempts found
|
|
|
+
|
|
|
+ for (Map.Entry<TaskAttemptID,Path> dirEntry : taLogDirs.entrySet()) {
|
|
|
+ final TaskAttemptID tid = dirEntry.getKey();
|
|
|
+ final Path profilePath = new Path(dirEntry.getValue(),
|
|
|
+ TaskLog.LogName.PROFILE.toString());
|
|
|
+ final Path stdoutPath = new Path(dirEntry.getValue(),
|
|
|
+ TaskLog.LogName.STDOUT.toString());
|
|
|
+ if (tid.getTaskType() == TaskType.MAP) {
|
|
|
+ if (tid.getTaskID().getId() == 1) {
|
|
|
+ // verify profile.out
|
|
|
+ final BufferedReader br = new BufferedReader(new InputStreamReader(
|
|
|
+ localFs.open(profilePath)));
|
|
|
+ final String line = br.readLine();
|
|
|
+ Assert.assertTrue("No hprof content found!",
|
|
|
+ line !=null && line.startsWith("JAVA PROFILE"));
|
|
|
+ br.close();
|
|
|
+ Assert.assertEquals(0L, localFs.getFileStatus(stdoutPath).getLen());
|
|
|
+ } else {
|
|
|
+ Assert.assertFalse("hprof file should not exist",
|
|
|
+ localFs.exists(profilePath));
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ Assert.assertFalse("hprof file should not exist",
|
|
|
+ localFs.exists(profilePath));
|
|
|
+ if (tid.getTaskID().getId() == 1) {
|
|
|
+ final BufferedReader br = new BufferedReader(new InputStreamReader(
|
|
|
+ localFs.open(stdoutPath)));
|
|
|
+ boolean flatProfFound = false;
|
|
|
+ String line;
|
|
|
+ while ((line = br.readLine()) != null) {
|
|
|
+ if (line.startsWith("Flat profile")) {
|
|
|
+ flatProfFound = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ br.close();
|
|
|
+ Assert.assertTrue("Xprof flat profile not found!", flatProfFound);
|
|
|
+ } else {
|
|
|
+ Assert.assertEquals(0L, localFs.getFileStatus(stdoutPath).getLen());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|