Browse Source

commit c6c4f767f29eefadf090507e619a10a4c4993d7e
Author: Vinay Kumar Thota <vinayt@yahoo-inc.com>
Date: Thu Jul 22 15:57:07 2010 +0000

MAPREDUCE:1898 from https://issues.apache.org/jira/secure/attachment/12450147/1898-ydist-security.patch


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077578 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 years ago
parent
commit
edeb4729ef

+ 10 - 0
src/test/system/aop/org/apache/hadoop/mapred/JTProtocolAspect.aj

@@ -82,4 +82,14 @@ public aspect JTProtocolAspect {
   public boolean JTProtocol.isBlackListed(String trackerID) throws IOException {
   public boolean JTProtocol.isBlackListed(String trackerID) throws IOException {
     return false;
     return false;
   }
   }
+  
+  public String JTProtocol.getJobSummaryFromLog(JobID jobId, 
+      String filePattern) throws IOException {
+    return null;
+  }
+
+  public String JTProtocol.getJobSummaryInfo(JobID jobId) throws IOException {
+    return null;
+  }
+
 }
 }

+ 41 - 2
src/test/system/aop/org/apache/hadoop/mapred/JobInProgressAspect.aj

@@ -35,8 +35,9 @@ privileged aspect JobInProgressAspect {
    */
    */
   public JobInfo JobInProgress.getJobInfo() {
   public JobInfo JobInProgress.getJobInfo() {
     String historyLoc = getHistoryPath();
     String historyLoc = getHistoryPath();
+    JobInfoImpl jobInfoImpl;
     if (tasksInited.get()) {
     if (tasksInited.get()) {
-      return new JobInfoImpl(
+      jobInfoImpl = new JobInfoImpl(
           this.getJobID(), this.isSetupLaunched(), this.isSetupFinished(), this
           this.getJobID(), this.isSetupLaunched(), this.isSetupFinished(), this
               .isCleanupLaunched(), this.runningMaps(), this.runningReduces(),
               .isCleanupLaunched(), this.runningMaps(), this.runningReduces(),
           this.pendingMaps(), this.pendingReduces(), this.finishedMaps(), this
           this.pendingMaps(), this.pendingReduces(), this.finishedMaps(), this
@@ -44,15 +45,53 @@ privileged aspect JobInProgressAspect {
               .getBlackListedTrackers(), false, this.numMapTasks,
               .getBlackListedTrackers(), false, this.numMapTasks,
           this.numReduceTasks, this.isHistoryFileCopied());
           this.numReduceTasks, this.isHistoryFileCopied());
     } else {
     } else {
-      return new JobInfoImpl(
+      jobInfoImpl = new JobInfoImpl(
           this.getJobID(), false, false, false, 0, 0, this.pendingMaps(), this
           this.getJobID(), false, false, false, 0, 0, this.pendingMaps(), this
               .pendingReduces(), this.finishedMaps(), this.finishedReduces(),
               .pendingReduces(), this.finishedMaps(), this.finishedReduces(),
           this.getStatus(), historyLoc, this.getBlackListedTrackers(), this
           this.getStatus(), historyLoc, this.getBlackListedTrackers(), this
               .isComplete(), this.numMapTasks, this.numReduceTasks, 
               .isComplete(), this.numMapTasks, this.numReduceTasks, 
               this.isHistoryFileCopied());
               this.isHistoryFileCopied());
     }
     }
+    jobInfoImpl.setFinishTime(getJobFinishTime());
+    jobInfoImpl.setLaunchTime(getJobLaunchTime());
+    jobInfoImpl.setNumSlotsPerReduce(getJobNumSlotsPerReduce());
+    jobInfoImpl.setNumSlotsPerMap(getJobNumSlotsPerMap());
+    return jobInfoImpl;
   }
   }
   
   
+  private long JobInProgress.getJobFinishTime() {
+    long finishTime = 0;
+    if (this.isComplete()) {
+      finishTime = this.getFinishTime();
+    }
+    return finishTime;
+  }
+
+  private long JobInProgress.getJobLaunchTime() {
+    long LaunchTime = 0;
+    if (this.isComplete()) {
+      LaunchTime = this.getLaunchTime();
+    }
+    return LaunchTime;
+  }
+
+  private int JobInProgress.getJobNumSlotsPerReduce() {
+    int numSlotsPerReduce = 0;
+    if (this.isComplete()) {
+      numSlotsPerReduce = this.getNumSlotsPerReduce();
+    }
+    return numSlotsPerReduce;
+  }
+
+  private int JobInProgress.getJobNumSlotsPerMap() {
+    int numSlotsPerMap = 0;
+    if (this.isComplete()) {
+      numSlotsPerMap = this.getNumSlotsPerMap();
+    }
+    return numSlotsPerMap;
+ }
+
+
   private String JobInProgress.getHistoryPath() {
   private String JobInProgress.getHistoryPath() {
     String historyLoc = "";
     String historyLoc = "";
     if(this.isComplete()) {
     if(this.isComplete()) {

+ 110 - 0
src/test/system/aop/org/apache/hadoop/mapred/JobTrackerAspect.aj

@@ -25,16 +25,23 @@ import java.util.Set;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.JobTracker.RetireJobInfo;
 import org.apache.hadoop.mapred.JobTracker.RetireJobInfo;
+import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.mapreduce.test.system.JTProtocol;
 import org.apache.hadoop.mapreduce.test.system.JTProtocol;
 import org.apache.hadoop.mapreduce.test.system.JobInfo;
 import org.apache.hadoop.mapreduce.test.system.JobInfo;
 import org.apache.hadoop.mapreduce.test.system.TTInfo;
 import org.apache.hadoop.mapreduce.test.system.TTInfo;
 import org.apache.hadoop.mapreduce.test.system.TaskInfo;
 import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.system.DaemonProtocol;
 import org.apache.hadoop.test.system.DaemonProtocol;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
 
 
 /**
 /**
  * Aspect class which injects the code for {@link JobTracker} class.
  * Aspect class which injects the code for {@link JobTracker} class.
@@ -43,6 +50,8 @@ import org.apache.hadoop.test.system.DaemonProtocol;
 public privileged aspect JobTrackerAspect {
 public privileged aspect JobTrackerAspect {
 
 
 
 
+  private static JobTracker tracker;
+  
   public Configuration JobTracker.getDaemonConf() throws IOException {
   public Configuration JobTracker.getDaemonConf() throws IOException {
     return conf;
     return conf;
   }
   }
@@ -207,6 +216,7 @@ public privileged aspect JobTrackerAspect {
       tracker.LOG.warn("Unable to get the user information for the " +
       tracker.LOG.warn("Unable to get the user information for the " +
       		"Jobtracker");
       		"Jobtracker");
     }
     }
+    this.tracker = tracker;
     tracker.setReady(true);
     tracker.setReady(true);
   }
   }
   
   
@@ -230,4 +240,104 @@ public privileged aspect JobTrackerAspect {
             .isJobCleanupTask()), trackers);
             .isJobCleanupTask()), trackers);
     return info;
     return info;
   }
   }
+  
+  /**
+   * Get the job summary details from the jobtracker log files.
+   * @param jobId - job id
+   * @param filePattern - jobtracker log file pattern.
+   * @return String - Job summary details of given job id.
+   * @throws IOException if any I/O error occurs.
+   */
+  public String JobTracker.getJobSummaryFromLogs(JobID jobId,
+      String filePattern) throws IOException {
+    String pattern = "JobId=" + jobId.toString() + ",submitTime";
+    String[] cmd = new String[] {
+                   "bash",
+                   "-c",
+                   "grep -i " 
+                 + pattern + " " 
+                 + filePattern + " " 
+                 + "| sed s/'JobSummary: '/'^'/g | cut -d'^' -f2"};
+    ShellCommandExecutor shexec = new ShellCommandExecutor(cmd);
+    shexec.execute();
+    return shexec.getOutput();
+  }
+  
+  /**
+   * Get the job summary information for given job id.
+   * @param jobId - job id.
+   * @return String - Job summary details as key value pair.
+   * @throws IOException if any I/O error occurs.
+   */
+  public String JobTracker.getJobSummaryInfo(JobID jobId) throws IOException {
+    StringBuffer jobSummary = new StringBuffer();
+    JobInProgress jip = jobs.
+        get(org.apache.hadoop.mapred.JobID.downgrade(jobId));
+    if (jip == null) {
+      LOG.warn("Job has not been found - " + jobId);
+      return null;
+    }
+    JobProfile profile = jip.getProfile();
+    JobStatus status = jip.getStatus();
+    final char[] charsToEscape = {StringUtils.COMMA, '=', 
+        StringUtils.ESCAPE_CHAR};
+    String user = StringUtils.escapeString(profile.getUser(), 
+        StringUtils.ESCAPE_CHAR, charsToEscape);
+    String queue = StringUtils.escapeString(profile.getQueueName(), 
+        StringUtils.ESCAPE_CHAR, charsToEscape);
+    Counters jobCounters = jip.getJobCounters();
+    long mapSlotSeconds = (jobCounters.getCounter(
+        JobInProgress.Counter.SLOTS_MILLIS_MAPS) + 
+        jobCounters.getCounter(JobInProgress.
+        Counter.FALLOW_SLOTS_MILLIS_MAPS)) / 1000;
+    long reduceSlotSeconds = (jobCounters.getCounter(
+        JobInProgress.Counter.SLOTS_MILLIS_REDUCES) + 
+       jobCounters.getCounter(JobInProgress.
+       Counter.FALLOW_SLOTS_MILLIS_REDUCES)) / 1000;
+    jobSummary.append("jobId=");
+    jobSummary.append(jip.getJobID());
+    jobSummary.append(",");
+    jobSummary.append("startTime=");
+    jobSummary.append(jip.getStartTime());
+    jobSummary.append(",");
+    jobSummary.append("launchTime=");
+    jobSummary.append(jip.getLaunchTime());
+    jobSummary.append(",");
+    jobSummary.append("finishTime=");
+    jobSummary.append(jip.getFinishTime());
+    jobSummary.append(",");
+    jobSummary.append("numMaps=");
+    jobSummary.append(jip.getTasks(TaskType.MAP).length);
+    jobSummary.append(",");
+    jobSummary.append("numSlotsPerMap=");
+    jobSummary.append(jip.getNumSlotsPerMap() );
+    jobSummary.append(",");
+    jobSummary.append("numReduces=");
+    jobSummary.append(jip.getTasks(TaskType.REDUCE).length);
+    jobSummary.append(",");
+    jobSummary.append("numSlotsPerReduce=");
+    jobSummary.append(jip.getNumSlotsPerReduce());
+    jobSummary.append(",");
+    jobSummary.append("user=");
+    jobSummary.append(user);
+    jobSummary.append(",");
+    jobSummary.append("queue=");
+    jobSummary.append(queue);
+    jobSummary.append(",");
+    jobSummary.append("status=");
+    jobSummary.append(JobStatus.getJobRunState(status.getRunState()));
+    jobSummary.append(",");
+    jobSummary.append("mapSlotSeconds=");
+    jobSummary.append(mapSlotSeconds);
+    jobSummary.append(",");
+    jobSummary.append("reduceSlotsSeconds=");
+    jobSummary.append(reduceSlotSeconds);
+    jobSummary.append(",");
+    jobSummary.append("clusterMapCapacity=");
+    jobSummary.append(tracker.getClusterMetrics().getMapSlotCapacity());
+    jobSummary.append(",");
+    jobSummary.append("clusterReduceCapacity=");
+    jobSummary.append(tracker.getClusterMetrics().getReduceSlotCapacity());
+    return jobSummary.toString();
+  }
 }
 }

+ 48 - 0
src/test/system/java/org/apache/hadoop/mapred/JobInfoImpl.java

@@ -51,6 +51,10 @@ class JobInfoImpl implements JobInfo {
   private int numMaps;
   private int numMaps;
   private int numReduces;
   private int numReduces;
   private boolean historyCopied;
   private boolean historyCopied;
+  private long finishTime;
+  private long launchTime;
+  private int numOfSlotsPerMap;
+  private int numOfSlotsPerReduce;
 
 
   public JobInfoImpl() {
   public JobInfoImpl() {
     id = new JobID();
     id = new JobID();
@@ -164,6 +168,42 @@ class JobInfoImpl implements JobInfo {
   public boolean isHistoryFileCopied() {
   public boolean isHistoryFileCopied() {
     return historyCopied;
     return historyCopied;
   }
   }
+
+  public void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+  
+  public void setLaunchTime(long launchTime) {
+    this.launchTime = launchTime;
+  }
+
+  @Override
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  @Override
+  public long getLaunchTime() {
+    return launchTime;
+  }
+
+  public void setNumSlotsPerMap(int numOfSlotsPerMap) {
+    this.numOfSlotsPerMap = numOfSlotsPerMap;
+  } 
+
+  public void setNumSlotsPerReduce(int numOfSlotsPerReduce) {
+    this.numOfSlotsPerReduce = numOfSlotsPerReduce;
+  }
+
+  @Override
+  public int getNumSlotsPerMap() {
+    return numOfSlotsPerMap;
+  }
+
+  @Override
+  public int getNumSlotsPerReduce() {
+    return numOfSlotsPerReduce;
+  }
   
   
   @Override
   @Override
   public void readFields(DataInput in) throws IOException {
   public void readFields(DataInput in) throws IOException {
@@ -186,6 +226,10 @@ class JobInfoImpl implements JobInfo {
     numMaps = in.readInt();
     numMaps = in.readInt();
     numReduces = in.readInt();
     numReduces = in.readInt();
     historyCopied = in.readBoolean();
     historyCopied = in.readBoolean();
+    finishTime = in.readLong();
+    launchTime = in.readLong();
+    numOfSlotsPerMap = in.readInt();
+    numOfSlotsPerReduce = in.readInt();
   }
   }
 
 
   @Override
   @Override
@@ -209,6 +253,10 @@ class JobInfoImpl implements JobInfo {
     out.writeInt(numMaps);
     out.writeInt(numMaps);
     out.writeInt(numReduces);
     out.writeInt(numReduces);
     out.writeBoolean(historyCopied);
     out.writeBoolean(historyCopied);
+    out.writeLong(finishTime);
+    out.writeLong(launchTime);
+    out.writeInt(numOfSlotsPerMap);
+    out.writeInt(numOfSlotsPerReduce);
   }
   }
 
 
 
 

+ 28 - 0
src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java

@@ -38,6 +38,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.test.system.TaskInfo;
 import org.apache.hadoop.mapreduce.test.system.TaskInfo;
 import org.apache.hadoop.mapred.UtilsForTests;
 import org.apache.hadoop.mapred.UtilsForTests;
 import static org.junit.Assert.*;
 import static org.junit.Assert.*;
+import java.util.HashMap;
+import java.util.StringTokenizer;
 
 
 /**
 /**
  * JobTracker client for system tests.
  * JobTracker client for system tests.
@@ -394,4 +396,30 @@ public class JTClient extends MRDaemonClient<JTProtocol> {
     }
     }
     return (counter != 60)? true : false;
     return (counter != 60)? true : false;
   }
   }
+  /**
+   * Get the jobtracker log files as pattern.
+   * @return String - Jobtracker log file pattern.
+   * @throws IOException - if I/O error occurs.
+   */
+  public String getJobTrackerLogFilePattern() throws IOException  {
+    return getProxy().getFilePattern();
+  }
+
+  /**
+   * It uses to get the job summary details of given job id. .
+   * @param jobID - job id
+   * @return HashMap -the job summary details as map.
+   * @throws IOException if any I/O error occurs.
+   */
+  public HashMap<String,String> getJobSummary(JobID jobID)
+      throws IOException {
+    String output = getProxy().getJobSummaryInfo(jobID);
+    StringTokenizer strToken = new StringTokenizer(output,",");
+    HashMap<String,String> mapcollect = new HashMap<String,String>();
+    while(strToken.hasMoreTokens()) {
+      String keypair = strToken.nextToken();
+      mapcollect.put(keypair.split("=")[0], keypair.split("=")[1]);
+    }
+    return mapcollect;
+  }
 }
 }

+ 18 - 0
src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java

@@ -126,4 +126,22 @@ public interface JTProtocol extends DaemonProtocol {
    * @throws IOException is thrown in case of RPC error
    * @throws IOException is thrown in case of RPC error
    */
    */
   public boolean isBlackListed(String trackerID) throws IOException;
   public boolean isBlackListed(String trackerID) throws IOException;
+
+  /**
+   * Get the job summary details from the jobtracker log files.
+   * @param jobID - job id
+   * @param filePattern - jobtracker log file pattern.
+   * @return String - the job summary details
+   * @throws IOException if any I/O error occurs.
+   */
+  public String getJobSummaryFromLog(JobID jobId, String filePattern)
+    throws IOException;
+
+  /**
+   * Get the job summary information of given job id.
+   * @param jobID - job id
+   * @return String - the job summary details as map.
+   * @throws IOException if any I/O error occurs.
+   */
+   public String getJobSummaryInfo(JobID jobId) throws IOException;
 }
 }

+ 21 - 1
src/test/system/java/org/apache/hadoop/mapreduce/test/system/JobInfo.java

@@ -136,4 +136,24 @@ public interface JobInfo extends Writable {
    * @return true if history file copied.
    * @return true if history file copied.
    */
    */
   boolean isHistoryFileCopied();
   boolean isHistoryFileCopied();
-}
+  /**
+   * Get the launch time of a job.
+   * @return long - launch time for a job.
+   */
+  long getLaunchTime();
+  /**
+   * Get the finish time of a job
+   * @return long - finish time for a job
+   */
+  long getFinishTime();
+  /**
+   * Get the number of slots per map.
+   * @return int - number of slots per map.
+   */
+  int getNumSlotsPerMap();
+  /**
+   * Get the number of slots per reduce.
+   * @return int - number of slots per reduce.
+   */
+  int getNumSlotsPerReduce();
+}