Browse Source

YARN-2556. Tool to measure the performance of the timeline server (Chang Li via sjlee)

(cherry picked from commit 58590fef49bf45fc97c81277560e08da6b753f95)
Sangjin Lee 9 năm trước cách đây
mục cha
commit
180efe6677

+ 53 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileParser.java

@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+
+class JobHistoryFileParser {
+  private static final Log LOG = LogFactory.getLog(JobHistoryFileParser.class);
+
+  private final FileSystem fs;
+
+  public JobHistoryFileParser(FileSystem fs) {
+    LOG.info("JobHistoryFileParser created with " + fs);
+    this.fs = fs;
+  }
+
+  public JobInfo parseHistoryFile(Path path) throws IOException {
+    LOG.info("parsing job history file " + path);
+    JobHistoryParser parser = new JobHistoryParser(fs, path);
+    return parser.parse();
+  }
+
+  public Configuration parseConfiguration(Path path) throws IOException {
+    LOG.info("parsing job configuration file " + path);
+    Configuration conf = new Configuration(false);
+    conf.addResource(fs.open(path));
+    return conf;
+  }
+}

+ 196 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayHelper.java

@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+
+class JobHistoryFileReplayHelper {
+  private static final Log LOG =
+      LogFactory.getLog(JobHistoryFileReplayHelper.class);
+  static final String PROCESSING_PATH = "processing path";
+  static final String REPLAY_MODE = "replay mode";
+  static final int WRITE_ALL_AT_ONCE = 1;
+  static final int WRITE_PER_ENTITY = 2;
+  static final int REPLAY_MODE_DEFAULT = WRITE_ALL_AT_ONCE;
+
+  private static Pattern JOB_ID_PARSER =
+      Pattern.compile("^(job_[0-9]+_([0-9]+)).*");
+  private enum FileType { JOB_HISTORY_FILE, JOB_CONF_FILE, UNKNOWN };
+  JobHistoryFileParser parser;
+  int replayMode;
+  Collection<JobFiles> jobFiles;
+
+  JobHistoryFileReplayHelper(Context context) throws IOException {
+    Configuration conf = context.getConfiguration();
+    int taskId = context.getTaskAttemptID().getTaskID().getId();
+    int size = conf.getInt(MRJobConfig.NUM_MAPS,
+        TimelineServicePerformance.NUM_MAPS_DEFAULT);
+    replayMode = conf.getInt(JobHistoryFileReplayHelper.REPLAY_MODE,
+            JobHistoryFileReplayHelper.REPLAY_MODE_DEFAULT);
+    String processingDir =
+        conf.get(JobHistoryFileReplayHelper.PROCESSING_PATH);
+
+    Path processingPath = new Path(processingDir);
+    FileSystem processingFs = processingPath.getFileSystem(conf);
+    parser = new JobHistoryFileParser(processingFs);
+    jobFiles = selectJobFiles(processingFs, processingPath, taskId, size);
+  }
+
+  public int getReplayMode() {
+    return replayMode;
+  }
+
+  public Collection<JobFiles> getJobFiles() {
+    return jobFiles;
+  }
+
+  public JobHistoryFileParser getParser() {
+    return parser;
+  }
+
+  public static class JobFiles {
+    private final String jobId;
+    private Path jobHistoryFilePath;
+    private Path jobConfFilePath;
+
+    public JobFiles(String jobId) {
+      this.jobId = jobId;
+    }
+
+    public String getJobId() {
+      return jobId;
+    }
+
+    public Path getJobHistoryFilePath() {
+      return jobHistoryFilePath;
+    }
+
+    public void setJobHistoryFilePath(Path jobHistoryFilePath) {
+      this.jobHistoryFilePath = jobHistoryFilePath;
+    }
+
+    public Path getJobConfFilePath() {
+      return jobConfFilePath;
+    }
+
+    public void setJobConfFilePath(Path jobConfFilePath) {
+      this.jobConfFilePath = jobConfFilePath;
+    }
+
+    @Override
+    public int hashCode() {
+      return jobId.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      JobFiles other = (JobFiles) obj;
+      return jobId.equals(other.jobId);
+    }
+  }
+
+  private Collection<JobFiles> selectJobFiles(FileSystem fs,
+      Path processingRoot, int i, int size) throws IOException {
+    Map<String, JobFiles> jobs = new HashMap<>();
+    RemoteIterator<LocatedFileStatus> it = fs.listFiles(processingRoot, true);
+    while (it.hasNext()) {
+      LocatedFileStatus status = it.next();
+      Path path = status.getPath();
+      String fileName = path.getName();
+      Matcher m = JOB_ID_PARSER.matcher(fileName);
+      if (!m.matches()) {
+        continue;
+      }
+      String jobId = m.group(1);
+      int lastId = Integer.parseInt(m.group(2));
+      int mod = lastId % size;
+      if (mod != i) {
+        continue;
+      }
+      LOG.info("this mapper will process file " + fileName);
+      // it's mine
+      JobFiles jobFiles = jobs.get(jobId);
+      if (jobFiles == null) {
+        jobFiles = new JobFiles(jobId);
+        jobs.put(jobId, jobFiles);
+      }
+      setFilePath(fileName, path, jobFiles);
+    }
+    return jobs.values();
+  }
+
+  private void setFilePath(String fileName, Path path,
+    JobFiles jobFiles) {
+    // determine if we're dealing with a job history file or a job conf file
+    FileType type = getFileType(fileName);
+    switch (type) {
+    case JOB_HISTORY_FILE:
+      if (jobFiles.getJobHistoryFilePath() == null) {
+        jobFiles.setJobHistoryFilePath(path);
+      } else {
+        LOG.warn("we already have the job history file " +
+            jobFiles.getJobHistoryFilePath() + ": skipping " + path);
+      }
+      break;
+    case JOB_CONF_FILE:
+      if (jobFiles.getJobConfFilePath() == null) {
+        jobFiles.setJobConfFilePath(path);
+      } else {
+        LOG.warn("we already have the job conf file " +
+            jobFiles.getJobConfFilePath() + ": skipping " + path);
+      }
+      break;
+    case UNKNOWN:
+      LOG.warn("unknown type: " + path);
+    }
+  }
+
+  private FileType getFileType(String fileName) {
+    if (fileName.endsWith(".jhist")) {
+      return FileType.JOB_HISTORY_FILE;
+    }
+    if (fileName.endsWith("_conf.xml")) {
+      return FileType.JOB_CONF_FILE;
+    }
+    return FileType.UNKNOWN;
+  }
+}

+ 158 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java

@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
+import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper;
+import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper.JobFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+
+/**
+ * Mapper for TimelineServicePerformanceV1 that replays job history files to the
+ * timeline service.
+ *
+ */
+class JobHistoryFileReplayMapperV1 extends
+    org.apache.hadoop.mapreduce.
+        Mapper<IntWritable,IntWritable,Writable,Writable> {
+  private static final Log LOG =
+      LogFactory.getLog(JobHistoryFileReplayMapperV1.class);
+
+  public void map(IntWritable key, IntWritable val, Context context) throws IOException {
+    // collect the apps it needs to process
+    TimelineClient tlc = new TimelineClientImpl();
+    TimelineEntityConverterV1 converter = new TimelineEntityConverterV1();
+    JobHistoryFileReplayHelper helper = new JobHistoryFileReplayHelper(context);
+    int replayMode = helper.getReplayMode();
+    Collection<JobFiles> jobs =
+        helper.getJobFiles();
+    JobHistoryFileParser parser = helper.getParser();
+
+    if (jobs.isEmpty()) {
+      LOG.info(context.getTaskAttemptID().getTaskID() +
+          " will process no jobs");
+    } else {
+      LOG.info(context.getTaskAttemptID().getTaskID() + " will process " +
+          jobs.size() + " jobs");
+    }
+    for (JobFiles job: jobs) {
+      // process each job
+      String jobIdStr = job.getJobId();
+      LOG.info("processing " + jobIdStr + "...");
+      JobId jobId = TypeConverter.toYarn(JobID.forName(jobIdStr));
+      ApplicationId appId = jobId.getAppId();
+
+      try {
+        // parse the job info and configuration
+        Path historyFilePath = job.getJobHistoryFilePath();
+        Path confFilePath = job.getJobConfFilePath();
+        if ((historyFilePath == null) || (confFilePath == null)) {
+          continue;
+        }
+        JobInfo jobInfo =
+            parser.parseHistoryFile(historyFilePath);
+        Configuration jobConf =
+            parser.parseConfiguration(confFilePath);
+        LOG.info("parsed the job history file and the configuration file for job "
+            + jobIdStr);
+
+        // create entities from job history and write them
+        long totalTime = 0;
+        Set<TimelineEntity> entitySet =
+            converter.createTimelineEntities(jobInfo, jobConf);
+        LOG.info("converted them into timeline entities for job " + jobIdStr);
+        // use the current user for this purpose
+        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+        long startWrite = System.nanoTime();
+        try {
+          switch (replayMode) {
+          case JobHistoryFileReplayHelper.WRITE_ALL_AT_ONCE:
+            writeAllEntities(tlc, entitySet, ugi);
+            break;
+          case JobHistoryFileReplayHelper.WRITE_PER_ENTITY:
+            writePerEntity(tlc, entitySet, ugi);
+            break;
+          default:
+            break;
+          }
+        } catch (Exception e) {
+          context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
+              increment(1);
+          LOG.error("writing to the timeline service failed", e);
+        }
+        long endWrite = System.nanoTime();
+        totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite);
+        int numEntities = entitySet.size();
+        LOG.info("wrote " + numEntities + " entities in " + totalTime + " ms");
+
+        context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
+            increment(totalTime);
+        context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
+            increment(numEntities);
+      } finally {
+        context.progress(); // move it along
+      }
+    }
+  }
+
+  private void writeAllEntities(TimelineClient tlc,
+      Set<TimelineEntity> entitySet, UserGroupInformation ugi)
+      throws IOException, YarnException {
+    tlc.putEntities((TimelineEntity[])entitySet.toArray());
+  }
+
+  private void writePerEntity(TimelineClient tlc,
+      Set<TimelineEntity> entitySet, UserGroupInformation ugi)
+      throws IOException, YarnException {
+    for (TimelineEntity entity : entitySet) {
+      tlc.putEntities(entity);
+      LOG.info("wrote entity " + entity.getEntityId());
+    }
+  }
+}

+ 120 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java

@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+   * Adds simple entities with random string payload, events, metrics, and
+   * configuration.
+   */
+class SimpleEntityWriterV1 extends
+    org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> {
+  private static final Log LOG = LogFactory.getLog(SimpleEntityWriterV1.class);
+
+  // constants for mtype = 1
+  static final String KBS_SENT = "kbs sent";
+  static final int KBS_SENT_DEFAULT = 1;
+  static final String TEST_TIMES = "testtimes";
+  static final int TEST_TIMES_DEFAULT = 100;
+  static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID =
+      "timeline.server.performance.run.id";
+  /**
+   *  To ensure that the compression really gets exercised, generate a
+   *  random alphanumeric fixed length payload
+   */
+  private static char[] ALPHA_NUMS = new char[] { 'a', 'b', 'c', 'd', 'e', 'f',
+    'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
+    's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D',
+    'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
+    'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2',
+    '3', '4', '5', '6', '7', '8', '9', '0', ' ' };
+
+  public void map(IntWritable key, IntWritable val, Context context) throws IOException {
+    TimelineClient tlc = new TimelineClientImpl();
+    Configuration conf = context.getConfiguration();
+
+    final int kbs = conf.getInt(KBS_SENT, KBS_SENT_DEFAULT);
+
+    long totalTime = 0;
+    final int testtimes = conf.getInt(TEST_TIMES, TEST_TIMES_DEFAULT);
+    final Random rand = new Random();
+    final TaskAttemptID taskAttemptId = context.getTaskAttemptID();
+    final char[] payLoad = new char[kbs * 1024];
+
+    for (int i = 0; i < testtimes; i++) {
+      // Generate a fixed length random payload
+      for (int xx = 0; xx < kbs * 1024; xx++) {
+        int alphaNumIdx =
+            rand.nextInt(ALPHA_NUMS.length);
+        payLoad[xx] = ALPHA_NUMS[alphaNumIdx];
+      }
+      String entId = taskAttemptId + "_" + Integer.toString(i);
+      final TimelineEntity entity = new TimelineEntity();
+      entity.setEntityId(entId);
+      entity.setEntityType("FOO_ATTEMPT");
+      entity.addOtherInfo("PERF_TEST", payLoad);
+      // add an event
+      TimelineEvent event = new TimelineEvent();
+      event.setTimestamp(System.currentTimeMillis());
+      event.setEventType("foo_event");
+      entity.addEvent(event);
+
+      // use the current user for this purpose
+      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      long startWrite = System.nanoTime();
+      try {
+        tlc.putEntities(entity);
+      } catch (Exception e) {
+        context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
+            increment(1);
+        LOG.error("writing to the timeline service failed", e);
+      }
+      long endWrite = System.nanoTime();
+      totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite);
+    }
+    LOG.info("wrote " + testtimes + " entities (" + kbs*testtimes +
+        " kB) in " + totalTime + " ms");
+    context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
+        increment(totalTime);
+    context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
+        increment(testtimes);
+    context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).
+        increment(kbs*testtimes);
+  }
+}

+ 167 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java

@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+
+class TimelineEntityConverterV1 {
+  private static final Log LOG =
+      LogFactory.getLog(TimelineEntityConverterV1.class);
+
+  static final String JOB = "MAPREDUCE_JOB";
+  static final String TASK = "MAPREDUCE_TASK";
+  static final String TASK_ATTEMPT = "MAPREDUCE_TASK_ATTEMPT";
+
+  /**
+   * Creates job, task, and task attempt entities based on the job history info
+   * and configuration.
+   *
+   * Note: currently these are plan timeline entities created for mapreduce
+   * types. These are not meant to be the complete and accurate entity set-up
+   * for mapreduce jobs. We do not leverage hierarchical timeline entities. If
+   * we create canonical mapreduce hierarchical timeline entities with proper
+   * parent-child relationship, we could modify this to use that instead.
+   *
+   * Note that we also do not add info to the YARN application entity, which
+   * would be needed for aggregation.
+   */
+  public Set<TimelineEntity> createTimelineEntities(JobInfo jobInfo,
+      Configuration conf) {
+    Set<TimelineEntity> entities = new HashSet<>();
+
+    // create the job entity
+    TimelineEntity job = createJobEntity(jobInfo, conf);
+    entities.add(job);
+
+    // create the task and task attempt entities
+    Set<TimelineEntity> tasksAndAttempts =
+        createTaskAndTaskAttemptEntities(jobInfo);
+    entities.addAll(tasksAndAttempts);
+
+    return entities;
+  }
+
+  private TimelineEntity createJobEntity(JobInfo jobInfo, Configuration conf) {
+    TimelineEntity job = new TimelineEntity();
+    job.setEntityType(JOB);
+    job.setEntityId(jobInfo.getJobId().toString());
+    job.setStartTime(jobInfo.getSubmitTime());
+
+    job.addPrimaryFilter("JOBNAME", jobInfo.getJobname());
+    job.addPrimaryFilter("USERNAME", jobInfo.getUsername());
+    job.addOtherInfo("JOB_QUEUE_NAME", jobInfo.getJobQueueName());
+    job.addOtherInfo("SUBMIT_TIME", jobInfo.getSubmitTime());
+    job.addOtherInfo("LAUNCH_TIME", jobInfo.getLaunchTime());
+    job.addOtherInfo("FINISH_TIME", jobInfo.getFinishTime());
+    job.addOtherInfo("JOB_STATUS", jobInfo.getJobStatus());
+    job.addOtherInfo("PRIORITY", jobInfo.getPriority());
+    job.addOtherInfo("TOTAL_MAPS", jobInfo.getTotalMaps());
+    job.addOtherInfo("TOTAL_REDUCES", jobInfo.getTotalReduces());
+    job.addOtherInfo("UBERIZED", jobInfo.getUberized());
+    job.addOtherInfo("ERROR_INFO", jobInfo.getErrorInfo());
+
+    LOG.info("converted job " + jobInfo.getJobId() + " to a timeline entity");
+    return job;
+  }
+
+  private Set<TimelineEntity> createTaskAndTaskAttemptEntities(JobInfo jobInfo) {
+    Set<TimelineEntity> entities = new HashSet<>();
+    Map<TaskID,TaskInfo> taskInfoMap = jobInfo.getAllTasks();
+    LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() +
+        " tasks");
+    for (TaskInfo taskInfo: taskInfoMap.values()) {
+      TimelineEntity task = createTaskEntity(taskInfo);
+      entities.add(task);
+      // add the task attempts from this task
+      Set<TimelineEntity> taskAttempts = createTaskAttemptEntities(taskInfo);
+      entities.addAll(taskAttempts);
+    }
+    return entities;
+  }
+
+  private TimelineEntity createTaskEntity(TaskInfo taskInfo) {
+    TimelineEntity task = new TimelineEntity();
+    task.setEntityType(TASK);
+    task.setEntityId(taskInfo.getTaskId().toString());
+    task.setStartTime(taskInfo.getStartTime());
+
+    task.addOtherInfo("START_TIME", taskInfo.getStartTime());
+    task.addOtherInfo("FINISH_TIME", taskInfo.getFinishTime());
+    task.addOtherInfo("TASK_TYPE", taskInfo.getTaskType());
+    task.addOtherInfo("TASK_STATUS", taskInfo.getTaskStatus());
+    task.addOtherInfo("ERROR_INFO", taskInfo.getError());
+
+    LOG.info("converted task " + taskInfo.getTaskId() +
+        " to a timeline entity");
+    return task;
+  }
+
+  private Set<TimelineEntity> createTaskAttemptEntities(TaskInfo taskInfo) {
+    Set<TimelineEntity> taskAttempts = new HashSet<TimelineEntity>();
+    Map<TaskAttemptID,TaskAttemptInfo> taskAttemptInfoMap =
+        taskInfo.getAllTaskAttempts();
+    LOG.info("task " + taskInfo.getTaskId() + " has " +
+        taskAttemptInfoMap.size() + " task attempts");
+    for (TaskAttemptInfo taskAttemptInfo: taskAttemptInfoMap.values()) {
+      TimelineEntity taskAttempt = createTaskAttemptEntity(taskAttemptInfo);
+      taskAttempts.add(taskAttempt);
+    }
+    return taskAttempts;
+  }
+
+  private TimelineEntity createTaskAttemptEntity(TaskAttemptInfo taskAttemptInfo) {
+    TimelineEntity taskAttempt = new TimelineEntity();
+    taskAttempt.setEntityType(TASK_ATTEMPT);
+    taskAttempt.setEntityId(taskAttemptInfo.getAttemptId().toString());
+    taskAttempt.setStartTime(taskAttemptInfo.getStartTime());
+
+    taskAttempt.addOtherInfo("START_TIME", taskAttemptInfo.getStartTime());
+    taskAttempt.addOtherInfo("FINISH_TIME", taskAttemptInfo.getFinishTime());
+    taskAttempt.addOtherInfo("MAP_FINISH_TIME",
+        taskAttemptInfo.getMapFinishTime());
+    taskAttempt.addOtherInfo("SHUFFLE_FINISH_TIME",
+        taskAttemptInfo.getShuffleFinishTime());
+    taskAttempt.addOtherInfo("SORT_FINISH_TIME",
+        taskAttemptInfo.getSortFinishTime());
+    taskAttempt.addOtherInfo("TASK_STATUS", taskAttemptInfo.getTaskStatus());
+    taskAttempt.addOtherInfo("STATE", taskAttemptInfo.getState());
+    taskAttempt.addOtherInfo("ERROR", taskAttemptInfo.getError());
+    taskAttempt.addOtherInfo("CONTAINER_ID",
+        taskAttemptInfo.getContainerId().toString());
+
+    LOG.info("converted task attempt " + taskAttemptInfo.getAttemptId() +
+        " to a timeline entity");
+    return taskAttempt;
+  }
+}

+ 197 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineServicePerformance.java

@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.SleepJob.SleepInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+
+public class TimelineServicePerformance extends Configured implements Tool {
+  static final int NUM_MAPS_DEFAULT = 1;
+
+  static final int SIMPLE_ENTITY_WRITER = 1;
+  static final int JOB_HISTORY_FILE_REPLAY_MAPPER = 2;
+  static int mapperType = SIMPLE_ENTITY_WRITER;
+  static final int TIMELINE_SERVICE_VERSION_1 = 1;
+  static final int TIMELINE_SERVICE_VERSION_2 = 2;
+  static int timeline_service_version = TIMELINE_SERVICE_VERSION_1;
+
+  protected static int printUsage() {
+    System.err.println(
+        "Usage: [-m <maps>] number of mappers (default: " + NUM_MAPS_DEFAULT +
+            ")\n" +
+        "     [-v] timeline service version\n" +
+        "     [-mtype <mapper type in integer>]\n" +
+        "          1. simple entity write mapper\n" +
+        "          2. jobhistory files replay mapper\n" +
+        "     [-s <(KBs)test>] number of KB per put (mtype=1, default: " +
+             SimpleEntityWriterV1.KBS_SENT_DEFAULT + " KB)\n" +
+        "     [-t] package sending iterations per mapper (mtype=1, default: " +
+             SimpleEntityWriterV1.TEST_TIMES_DEFAULT + ")\n" +
+        "     [-d <path>] root path of job history files (mtype=2)\n" +
+        "     [-r <replay mode>] (mtype=2)\n" +
+        "          1. write all entities for a job in one put (default)\n" +
+        "          2. write one entity at a time\n");
+    GenericOptionsParser.printGenericCommandUsage(System.err);
+    return -1;
+  }
+
+  /**
+   * Configure a job given argv.
+   */
+  public static boolean parseArgs(String[] args, Job job) throws IOException {
+    // set the common defaults
+    Configuration conf = job.getConfiguration();
+    conf.setInt(MRJobConfig.NUM_MAPS, NUM_MAPS_DEFAULT);
+
+    for (int i = 0; i < args.length; i++) {
+      if (args.length == i + 1) {
+        System.out.println("ERROR: Required parameter missing from " + args[i]);
+        return printUsage() == 0;
+      }
+      try {
+        if ("-v".equals(args[i])) {
+          timeline_service_version = Integer.parseInt(args[++i]);
+        }
+        if ("-m".equals(args[i])) {
+          if (Integer.parseInt(args[++i]) > 0) {
+            job.getConfiguration()
+                .setInt(MRJobConfig.NUM_MAPS, Integer.parseInt(args[i]));
+          }
+        } else if ("-mtype".equals(args[i])) {
+          mapperType = Integer.parseInt(args[++i]);
+        } else if ("-s".equals(args[i])) {
+          if (Integer.parseInt(args[++i]) > 0) {
+            conf.setInt(SimpleEntityWriterV1.KBS_SENT, Integer.parseInt(args[i]));
+          }
+        } else if ("-t".equals(args[i])) {
+          if (Integer.parseInt(args[++i]) > 0) {
+            conf.setInt(SimpleEntityWriterV1.TEST_TIMES,
+                Integer.parseInt(args[i]));
+          }
+        } else if ("-d".equals(args[i])) {
+          conf.set(JobHistoryFileReplayHelper.PROCESSING_PATH, args[++i]);
+        } else if ("-r".equals(args[i])) {
+          conf.setInt(JobHistoryFileReplayHelper.REPLAY_MODE,
+          Integer.parseInt(args[++i]));
+        } else {
+          System.out.println("Unexpected argument: " + args[i]);
+          return printUsage() == 0;
+        }
+      } catch (NumberFormatException except) {
+        System.out.println("ERROR: Integer expected instead of " + args[i]);
+        return printUsage() == 0;
+      } catch (Exception e) {
+        throw (IOException)new IOException().initCause(e);
+      }
+    }
+
+    // handle mapper-specific settings
+    switch (timeline_service_version) {
+    case TIMELINE_SERVICE_VERSION_1:
+    default:
+      switch (mapperType) {
+      case JOB_HISTORY_FILE_REPLAY_MAPPER:
+        job.setMapperClass(JobHistoryFileReplayMapperV1.class);
+        String processingPath =
+            conf.get(JobHistoryFileReplayHelper.PROCESSING_PATH);
+        if (processingPath == null || processingPath.isEmpty()) {
+          System.out.println("processing path is missing while mtype = 2");
+          return printUsage() == 0;
+        }
+        break;
+      case SIMPLE_ENTITY_WRITER:
+      default:
+        job.setMapperClass(SimpleEntityWriterV1.class);
+        // use the current timestamp as the "run id" of the test: this will
+        // be used as simulating the cluster timestamp for apps
+        conf.setLong(SimpleEntityWriterV1.TIMELINE_SERVICE_PERFORMANCE_RUN_ID,
+          System.currentTimeMillis());
+        break;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * TimelineServer Performance counters
+   */
+  static enum PerfCounters {
+    TIMELINE_SERVICE_WRITE_TIME,
+    TIMELINE_SERVICE_WRITE_COUNTER,
+    TIMELINE_SERVICE_WRITE_FAILURES,
+    TIMELINE_SERVICE_WRITE_KBS,
+  }
+
+  public int run(String[] args) throws Exception {
+
+    Job job = Job.getInstance(getConf());
+    job.setJarByClass(TimelineServicePerformance.class);
+    job.setMapperClass(SimpleEntityWriterV1.class);
+    job.setInputFormatClass(SleepInputFormat.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    job.setNumReduceTasks(0);
+    if (!parseArgs(args, job)) {
+      return -1;
+    }
+
+    Date startTime = new Date();
+    System.out.println("Job started: " + startTime);
+    int ret = job.waitForCompletion(true) ? 0 : 1;
+    org.apache.hadoop.mapreduce.Counters counters = job.getCounters();
+    long writetime =
+        counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).getValue();
+    long writecounts =
+        counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).getValue();
+    long writesize =
+        counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).getValue();
+    double transacrate = writecounts * 1000 / (double)writetime;
+    double iorate = writesize * 1000 / (double)writetime;
+    int numMaps =
+        Integer.parseInt(job.getConfiguration().get(MRJobConfig.NUM_MAPS));
+
+    System.out.println("TRANSACTION RATE (per mapper): " + transacrate +
+        " ops/s");
+    System.out.println("IO RATE (per mapper): " + iorate + " KB/s");
+
+    System.out.println("TRANSACTION RATE (total): " + transacrate*numMaps +
+        " ops/s");
+    System.out.println("IO RATE (total): " + iorate*numMaps + " KB/s");
+
+    return ret;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res =
+        ToolRunner.run(new Configuration(), new TimelineServicePerformance(),
+            args);
+    System.exit(res);
+  }
+
+}

+ 23 - 20
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.mapred.TestMapRed;
 import org.apache.hadoop.mapred.TestSequenceFileInputFormat;
 import org.apache.hadoop.mapred.TestTextInputFormat;
 import org.apache.hadoop.mapred.ThreadedMapBenchmark;
+import org.apache.hadoop.mapreduce.TimelineServicePerformance;
 import org.apache.hadoop.mapreduce.FailJob;
 import org.apache.hadoop.mapreduce.LargeSorter;
 import org.apache.hadoop.mapreduce.MiniHadoopClusterManager;
@@ -55,60 +56,62 @@ import org.apache.hadoop.fs.slive.SliveTest;
 public class MapredTestDriver {
 
   private ProgramDriver pgd;
-  
+
   public MapredTestDriver() {
     this(new ProgramDriver());
   }
-  
+
   public MapredTestDriver(ProgramDriver pgd) {
     this.pgd = pgd;
     try {
-      pgd.addClass("testsequencefile", TestSequenceFile.class, 
+      pgd.addClass("testsequencefile", TestSequenceFile.class,
       "A test for flat files of binary key value pairs.");
-      pgd.addClass("threadedmapbench", ThreadedMapBenchmark.class, 
-          "A map/reduce benchmark that compares the performance " + 
+      pgd.addClass("threadedmapbench", ThreadedMapBenchmark.class,
+          "A map/reduce benchmark that compares the performance " +
           "of maps with multiple spills over maps with 1 spill");
-      pgd.addClass("mrbench", MRBench.class, 
+      pgd.addClass("mrbench", MRBench.class,
           "A map/reduce benchmark that can create many small jobs");
       pgd.addClass("mapredtest", TestMapRed.class, "A map/reduce test check.");
-      pgd.addClass("testsequencefileinputformat", 
-          TestSequenceFileInputFormat.class, 
+      pgd.addClass("testsequencefileinputformat",
+          TestSequenceFileInputFormat.class,
           "A test for sequence file input format.");
-      pgd.addClass("testtextinputformat", TestTextInputFormat.class, 
+      pgd.addClass("testtextinputformat", TestTextInputFormat.class,
           "A test for text input format.");
-      pgd.addClass("testmapredsort", SortValidator.class, 
+      pgd.addClass("testmapredsort", SortValidator.class,
           "A map/reduce program that validates the " +
           "map-reduce framework's sort.");
-      pgd.addClass("testbigmapoutput", BigMapOutput.class, 
+      pgd.addClass("testbigmapoutput", BigMapOutput.class,
           "A map/reduce program that works on a very big " +
           "non-splittable file and does identity map/reduce");
-      pgd.addClass("loadgen", GenericMRLoadGenerator.class, 
+      pgd.addClass("loadgen", GenericMRLoadGenerator.class,
           "Generic map/reduce load generator");
       pgd.addClass("MRReliabilityTest", ReliabilityTest.class,
           "A program that tests the reliability of the MR framework by " +
           "injecting faults/failures");
       pgd.addClass("fail", FailJob.class, "a job that always fails");
-      pgd.addClass("sleep", SleepJob.class, 
+      pgd.addClass("sleep", SleepJob.class,
                    "A job that sleeps at each map and reduce task.");
-      pgd.addClass("nnbench", NNBench.class, 
+      pgd.addClass("timelineperformance", TimelineServicePerformance.class,
+                   "A job that launches mappers to test timlineserver performance.");
+      pgd.addClass("nnbench", NNBench.class,
           "A benchmark that stresses the namenode w/ MR.");
       pgd.addClass("nnbenchWithoutMR", NNBenchWithoutMR.class,
           "A benchmark that stresses the namenode w/o MR.");
-      pgd.addClass("testfilesystem", TestFileSystem.class, 
+      pgd.addClass("testfilesystem", TestFileSystem.class,
           "A test for FileSystem read/write.");
-      pgd.addClass(TestDFSIO.class.getSimpleName(), TestDFSIO.class, 
+      pgd.addClass(TestDFSIO.class.getSimpleName(), TestDFSIO.class,
           "Distributed i/o benchmark.");
       pgd.addClass("DFSCIOTest", DFSCIOTest.class, "" +
           "Distributed i/o benchmark of libhdfs.");
-      pgd.addClass("DistributedFSCheck", DistributedFSCheck.class, 
+      pgd.addClass("DistributedFSCheck", DistributedFSCheck.class,
           "Distributed checkup of the file system consistency.");
-      pgd.addClass("filebench", FileBench.class, 
+      pgd.addClass("filebench", FileBench.class,
           "Benchmark SequenceFile(Input|Output)Format " +
           "(block,record compressed and uncompressed), " +
           "Text(Input|Output)Format (compressed and uncompressed)");
-      pgd.addClass(JHLogAnalyzer.class.getSimpleName(), JHLogAnalyzer.class, 
+      pgd.addClass(JHLogAnalyzer.class.getSimpleName(), JHLogAnalyzer.class,
           "Job History Log analyzer.");
-      pgd.addClass(SliveTest.class.getSimpleName(), SliveTest.class, 
+      pgd.addClass(SliveTest.class.getSimpleName(), SliveTest.class,
           "HDFS Stress Test and Live Data Verification.");
       pgd.addClass("minicluster", MiniHadoopClusterManager.class,
       "Single process HDFS and MR cluster.");

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -177,6 +177,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3964. Support NodeLabelsProvider at Resource Manager side.
     (Dian Fu via devaraj)
 
+    YARN-2556. Tool to measure the performance of the timeline server (Chang Li
+    via sjlee)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before