Browse Source

HADOOP-4843. Collect job history and configuration in Chukwa. Contributed by Eric Yang

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@735219 13f79535-47bb-0310-9956-ffa450edef68
Christopher Douglas 16 years ago
parent
commit
f64ed69124

+ 3 - 0
CHANGES.txt

@@ -302,6 +302,9 @@ Release 0.20.0 - Unreleased
     BlockCompressorStream, and BlockDecompressorStream public to facilitate 
     non-Hadoop codecs. (omalley)
 
+    HADOOP-4843. Collect job history and configuration in Chukwa. (Eric Yang
+    via cdouglas)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

+ 3 - 1
src/contrib/chukwa/build.xml

@@ -276,7 +276,7 @@
 		<mkdir dir="${build.dir}"/>
 		<mkdir dir="${build.classes}"/>
 		<mkdir dir="${build.dir}/test"/>
-		<javac srcdir="src/java/org/apache/hadoop/chukwa" destdir="${build.classes}" excludes="**/ChukwaTTInstru.java" debug="${javac.debug}">
+		<javac srcdir="src/java/org/apache/hadoop" destdir="${build.classes}" excludes="**/ChukwaTTInstru.java" debug="${javac.debug}">
 			<classpath refid="classpath" />
 		</javac>
 	</target>
@@ -446,6 +446,7 @@
 
 		<jar jarfile="${build.dir}/chukwa-hadoop-${chukwaVersion}-client.jar" basedir="${build.classes}" includes="org/apache/hadoop/chukwa/inputtools/log4j/**/*.class">
 			<fileset dir="${basedir}/src/java">
+				<include name="org/apache/hadoop/mapred/**/*.java"/>
 				<include name="org/apache/hadoop/chukwa/inputtools/log4j/**/*.java"/>
 		                <include name="org/apache/hadoop/chukwa/datacollection/client/**/*.java"/>
 		                <include name="org/apache/hadoop/chukwa/util/**/*.java"/>
@@ -454,6 +455,7 @@
 		                <include name="chukwa-hadoop-metrics-log4j.properties"/>
 			</fileset>
 			<fileset dir="${build.classes}">
+				<include name="org/apache/hadoop/mapred/**/*.class"/>
 				<include name="org/apache/hadoop/chukwa/datacollection/client/**/*.class"/>
 				<include name="org/apache/hadoop/chukwa/util/**/*.class"/>
 				<include name="org/apache/hadoop/chukwa/datacollection/controller/*.class"/>

+ 104 - 0
src/contrib/chukwa/src/java/org/apache/hadoop/mapred/ChukwaJobTrackerInstrumentation.java

@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobHistory;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
+import org.apache.hadoop.fs.Path;
+
+public class ChukwaJobTrackerInstrumentation extends org.apache.hadoop.mapred.JobTrackerInstrumentation {
+
+	  protected final JobTracker tracker;
+	  private static ChukwaAgentController chukwaClient = null;
+	  private static Log log = LogFactory.getLog(JobTrackerInstrumentation.class);
+	  private static HashMap<JobID, Long> jobConfs = null;
+	  private static HashMap<JobID, Long> jobHistories = null;
+
+	  public ChukwaJobTrackerInstrumentation(JobTracker jt, JobConf conf) {
+          super(jt,conf);
+	      tracker = jt;
+	      if(chukwaClient==null) {
+		      chukwaClient = new ChukwaAgentController();
+	      }
+	      if(jobConfs==null) {
+	    	  jobConfs = new HashMap<JobID, Long>();
+	      }
+	      if(jobHistories==null) {
+	    	  jobHistories = new HashMap<JobID, Long>();
+	      }
+	  }
+
+	  public void launchMap(TaskAttemptID taskAttemptID) {
+		  
+	  }
+
+	  public void completeMap(TaskAttemptID taskAttemptID) {
+		  
+	  }
+
+	  public void launchReduce(TaskAttemptID taskAttemptID) {
+		  
+	  }
+
+	  public void completeReduce(TaskAttemptID taskAttemptID) {
+		  
+	  }
+
+	  public void submitJob(JobConf conf, JobID id) {
+          String chukwaJobConf = tracker.getLocalJobFilePath(id);
+          try {
+              String jobFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+              Path jobHistoryPath = JobHistory.JobInfo.getJobHistoryLogLocation(jobFileName);
+              String jobConfPath = JobHistory.JobInfo.getLocalJobFilePath(id);
+              long adaptorID = chukwaClient.add("org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped", "JobConf", "0 "+jobConfPath, 0);
+              jobConfs.put(id, adaptorID);
+              if(jobHistoryPath.toString().matches("^hdfs://")) {
+                  adaptorID = chukwaClient.add("org.apache.hadoop.chukwa.datacollection.adaptor.HDFSAdaptor", "JobHistory", "0 "+jobHistoryPath.toString(), 0);
+              } else {
+                  adaptorID = chukwaClient.add("org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped", "JobHistory", "0 "+jobHistoryPath.toString().substring(5), 0);            	  
+              }
+              jobHistories.put(id, adaptorID);
+          } catch(Exception ex) {
+        	  
+          }
+      }
+
+	  public void completeJob(JobConf conf, JobID id) {
+          try {
+             if (jobHistories.containsKey(id)) {
+                 chukwaClient.remove(jobHistories.get(id));
+             }
+             if (jobConfs.containsKey(id)) {
+                 chukwaClient.remove(jobConfs.get(id));
+             }
+          } catch(Throwable e) {
+            log.warn("could not remove adaptor for this job: " + id.toString(),e);
+            e.printStackTrace();
+          }
+	  }
+
+}