|
@@ -23,10 +23,12 @@ import java.io.File;
|
|
|
import java.io.FileInputStream;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.FileOutputStream;
|
|
|
+import java.io.InputStreamReader;
|
|
|
import java.io.IOException;
|
|
|
import java.io.StringReader;
|
|
|
import java.net.URI;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
+import java.util.EnumSet;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.jar.JarOutputStream;
|
|
@@ -53,6 +55,8 @@ import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.io.LongWritable;
|
|
|
import org.apache.hadoop.io.NullWritable;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.mapred.JobConf;
|
|
|
+import org.apache.hadoop.mapred.TaskLog;
|
|
|
import org.apache.hadoop.mapreduce.Counters;
|
|
|
import org.apache.hadoop.mapreduce.Job;
|
|
|
import org.apache.hadoop.mapreduce.JobCounter;
|
|
@@ -65,17 +69,22 @@ import org.apache.hadoop.mapreduce.TaskCompletionEvent;
|
|
|
import org.apache.hadoop.mapreduce.TaskID;
|
|
|
import org.apache.hadoop.mapreduce.TaskReport;
|
|
|
import org.apache.hadoop.mapreduce.TaskType;
|
|
|
+import org.apache.hadoop.mapreduce.TypeConverter;
|
|
|
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
|
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
|
|
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
|
|
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
|
|
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
import org.apache.hadoop.util.JarFinder;
|
|
|
import org.apache.hadoop.util.Shell;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
|
+import org.apache.log4j.Level;
|
|
|
import org.junit.AfterClass;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.BeforeClass;
|
|
@@ -84,6 +93,9 @@ import org.junit.Test;
|
|
|
public class TestMRJobs {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(TestMRJobs.class);
|
|
|
+ private static final EnumSet<RMAppState> TERMINAL_RM_APP_STATES =
|
|
|
+ EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED);
|
|
|
+ private static final int NUM_NODE_MGRS = 3;
|
|
|
|
|
|
protected static MiniMRYarnCluster mrCluster;
|
|
|
protected static MiniDFSCluster dfsCluster;
|
|
@@ -122,7 +134,8 @@ public class TestMRJobs {
|
|
|
}
|
|
|
|
|
|
if (mrCluster == null) {
|
|
|
- mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(), 3);
|
|
|
+ mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(),
|
|
|
+ NUM_NODE_MGRS);
|
|
|
Configuration conf = new Configuration();
|
|
|
conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
|
|
|
conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
|
|
@@ -416,6 +429,115 @@ public class TestMRJobs {
|
|
|
// TODO later: add explicit "isUber()" checks of some sort
|
|
|
}
|
|
|
|
|
|
+ @Test(timeout = 120000)
|
|
|
+ public void testContainerRollingLog() throws IOException,
|
|
|
+ InterruptedException, ClassNotFoundException {
|
|
|
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
|
|
|
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
|
|
|
+ + " not found. Not running test.");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ final SleepJob sleepJob = new SleepJob();
|
|
|
+ final JobConf sleepConf = new JobConf(mrCluster.getConfig());
|
|
|
+ sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString());
|
|
|
+ sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString());
|
|
|
+ sleepConf.setLong(MRJobConfig.TASK_USERLOG_LIMIT, 1);
|
|
|
+ sleepConf.setInt(MRJobConfig.TASK_LOG_BACKUPS, 3);
|
|
|
+ sleepConf.setInt(MRJobConfig.MR_AM_LOG_BACKUPS, 7);
|
|
|
+ sleepJob.setConf(sleepConf);
|
|
|
+
|
|
|
+ final Job job = sleepJob.createJob(1, 0, 1L, 100, 0L, 0);
|
|
|
+ job.setJarByClass(SleepJob.class);
|
|
|
+ job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
|
|
|
+ job.waitForCompletion(true);
|
|
|
+ final JobId jobId = TypeConverter.toYarn(job.getJobID());
|
|
|
+ final ApplicationId appID = jobId.getAppId();
|
|
|
+ int pollElapsed = 0;
|
|
|
+ while (true) {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ pollElapsed += 1000;
|
|
|
+ if (TERMINAL_RM_APP_STATES.contains(
|
|
|
+ mrCluster.getResourceManager().getRMContext().getRMApps().get(appID)
|
|
|
+ .getState())) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ if (pollElapsed >= 60000) {
|
|
|
+ LOG.warn("application did not reach terminal state within 60 seconds");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Assert.assertEquals(RMAppState.FINISHED, mrCluster.getResourceManager()
|
|
|
+ .getRMContext().getRMApps().get(appID).getState());
|
|
|
+
|
|
|
+ // Job finished, verify logs
|
|
|
+ //
|
|
|
+
|
|
|
+ final String appIdStr = appID.toString();
|
|
|
+ final String appIdSuffix = appIdStr.substring("application_".length(),
|
|
|
+ appIdStr.length());
|
|
|
+ final String containerGlob = "container_" + appIdSuffix + "_*_*";
|
|
|
+ final String syslogGlob = appIdStr
|
|
|
+ + Path.SEPARATOR + containerGlob
|
|
|
+ + Path.SEPARATOR + TaskLog.LogName.SYSLOG;
|
|
|
+ int numAppMasters = 0;
|
|
|
+ int numMapTasks = 0;
|
|
|
+
|
|
|
+ for (int i = 0; i < NUM_NODE_MGRS; i++) {
|
|
|
+ final Configuration nmConf = mrCluster.getNodeManager(i).getConfig();
|
|
|
+ for (String logDir :
|
|
|
+ nmConf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS)) {
|
|
|
+ final Path absSyslogGlob =
|
|
|
+ new Path(logDir + Path.SEPARATOR + syslogGlob);
|
|
|
+ LOG.info("Checking for glob: " + absSyslogGlob);
|
|
|
+ final FileStatus[] syslogs = localFs.globStatus(absSyslogGlob);
|
|
|
+ for (FileStatus slog : syslogs) {
|
|
|
+ // check all syslogs for the container
|
|
|
+ //
|
|
|
+ final FileStatus[] sysSiblings = localFs.globStatus(new Path(
|
|
|
+ slog.getPath().getParent(), TaskLog.LogName.SYSLOG + "*"));
|
|
|
+ boolean foundAppMaster = false;
|
|
|
+ floop:
|
|
|
+ for (FileStatus f : sysSiblings) {
|
|
|
+ final BufferedReader reader = new BufferedReader(
|
|
|
+ new InputStreamReader(localFs.open(f.getPath())));
|
|
|
+ String line;
|
|
|
+ try {
|
|
|
+ while ((line = reader.readLine()) != null) {
|
|
|
+ if (line.contains(MRJobConfig.APPLICATION_MASTER_CLASS)) {
|
|
|
+ foundAppMaster = true;
|
|
|
+ break floop;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ reader.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (foundAppMaster) {
|
|
|
+ numAppMasters++;
|
|
|
+ } else {
|
|
|
+ numMapTasks++;
|
|
|
+ }
|
|
|
+
|
|
|
+ Assert.assertSame("Number of sylog* files",
|
|
|
+ foundAppMaster
|
|
|
+ ? sleepConf.getInt(MRJobConfig.MR_AM_LOG_BACKUPS, 0) + 1
|
|
|
+ : sleepConf.getInt(MRJobConfig.TASK_LOG_BACKUPS, 0) + 1,
|
|
|
+ sysSiblings.length);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Make sure we checked non-empty set
|
|
|
+ //
|
|
|
+ Assert.assertEquals("No AppMaster log found!", 1, numAppMasters);
|
|
|
+ if (sleepConf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false)) {
|
|
|
+ Assert.assertEquals("MapTask log with uber found!", 0, numMapTasks);
|
|
|
+ } else {
|
|
|
+ Assert.assertEquals("No MapTask log found!", 1, numMapTasks);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public static class DistributedCacheChecker extends
|
|
|
Mapper<LongWritable, Text, NullWritable, NullWritable> {
|
|
|
|